Skip to content

Fix FastLazy race condition and waiting thread hanging (#6336) #6707

New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Merged
merged 6 commits into from
May 3, 2023
Merged
Show file tree
Hide file tree
Changes from 5 commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
9 changes: 0 additions & 9 deletions src/benchmark/Akka.Benchmarks/Utils/FastLazyBenchmarks.cs
Original file line number Diff line number Diff line change
Expand Up @@ -19,16 +19,13 @@ public class FastLazyBenchmarks
private Lazy<int> lazySafe;
private Lazy<int> lazyUnsafe;
private FastLazy<int> fastLazy;
private FastLazy<int, int> fastLazyWithInit;


[GlobalSetup]
public void Setup()
{
lazySafe = new Lazy<int>(() => 100, LazyThreadSafetyMode.ExecutionAndPublication);
lazyUnsafe = new Lazy<int>(() => 100, LazyThreadSafetyMode.None);
fastLazy = new FastLazy<int>(() => 100);
fastLazyWithInit = new FastLazy<int, int>(state => state + 100, 1000);
}

[Benchmark(Baseline = true)]
Expand All @@ -48,11 +45,5 @@ public int FastLazy_get_value()
{
return fastLazy.Value;
}

[Benchmark]
public int FastLazy_stateful_get_value()
{
return fastLazyWithInit.Value;
}
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -4996,14 +4996,8 @@ namespace Akka.Util
public sealed class FastLazy<T>
{
public FastLazy(System.Func<T> producer) { }
public bool IsValueCreated { get; }
public T Value { get; }
}
public sealed class FastLazy<S, T>
{
public FastLazy(System.Func<S, T> producer, S state) { }
public bool IsValueCreated { get; }
public T Value { get; }
public bool IsValueCreated() { }
Copy link
Contributor Author

@F0b0s F0b0s May 2, 2023

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Changed to method because with
[MethodImpl(MethodImplOptions.AggressiveInlining)]
it shows slightly better performance

}
public interface ISurrogate
{
Expand Down
118 changes: 118 additions & 0 deletions src/core/Akka.Tests/Util/FastLazySpecs.cs
Original file line number Diff line number Diff line change
@@ -0,0 +1,118 @@
//-----------------------------------------------------------------------
// <copyright file="FastLazySpecs.cs" company="Akka.NET Project">
// Copyright (C) 2009-2023 Lightbend Inc. <http://www.lightbend.com>
// Copyright (C) 2013-2023 .NET Foundation <https://github.com/akkadotnet/akka.net>
// </copyright>
//-----------------------------------------------------------------------

using System;
using System.Collections.Concurrent;
using System.Linq;
using System.Threading;
using System.Threading.Tasks;
using Akka.Util;
using Xunit;

namespace Akka.Tests.Util;

public class FastLazySpecs
{
[Fact]
public void FastLazy_should_indicate_no_value_has_been_produced()
{
var fal = new FastLazy<int>(() => 2);
Assert.False(fal.IsValueCreated());
}

[Fact]
public void FastLazy_should_produce_value()
{
var fal = new FastLazy<int>(() => 2);
var value = fal.Value;
Assert.Equal(2, value);
Assert.True(fal.IsValueCreated());
}

[Fact]
public void FastLazy_must_be_threadsafe()
{
for (var c = 0; c < 100000; c++) // try this 100000 times
{
var values = new ConcurrentBag<int>();
var fal = new FastLazy<int>(() => new Random().Next(1, Int32.MaxValue));
var result = Parallel.For(0, 1000, i => values.Add(fal.Value)); // 1000 concurrent operations
SpinWait.SpinUntil(() => result.IsCompleted);
var value = values.First();
Assert.NotEqual(0, value);
Assert.True(values.All(x => x.Equals(value)));
}
}

[Fact]
public void FastLazy_only_single_value_creation_attempt()
{
int attempts = 0;
Func<int> slowValueFactory = () =>
{
Interlocked.Increment(ref attempts);
Thread.Sleep(100);
return new Random().Next(1, Int32.MaxValue);
};

var values = new ConcurrentBag<int>();
var fal = new FastLazy<int>(slowValueFactory);
var result = Parallel.For(0, 1000, i => values.Add(fal.Value)); // 1000 concurrent operations
SpinWait.SpinUntil(() => result.IsCompleted);
var value = values.First();
Assert.NotEqual(0, value);
Assert.True(values.All(x => x.Equals(value)));
Assert.Equal(1000, values.Count);
Assert.Equal(1, attempts);
}

[Fact]
public void FastLazy_must_be_threadsafe_AnyRef()
{
for (var c = 0; c < 100000; c++) // try this 100000 times
{
var values = new ConcurrentBag<string>();
var fal = new FastLazy<string>(() => Guid.NewGuid().ToString());
var result = Parallel.For(0, 1000, i => values.Add(fal.Value)); // 1000 concurrent operations
SpinWait.SpinUntil(() => result.IsCompleted);
var value = values.First();
Assert.NotNull(value);
Assert.True(values.All(x => x.Equals(value)));
}
}

[Fact]
public void FastLazy_only_single_value_creation_attempt_AnyRef()
{
int attempts = 0;
Func<string> slowValueFactory = () =>
{
Interlocked.Increment(ref attempts);
Thread.Sleep(100);
return Guid.NewGuid().ToString();
};

var values = new ConcurrentBag<string>();
var fal = new FastLazy<string>(slowValueFactory);
var result = Parallel.For(0, 1000, i => values.Add(fal.Value)); // 1000 concurrent operations
SpinWait.SpinUntil(() => result.IsCompleted);
var value = values.First();
Assert.NotNull(value);
Assert.True(values.All(x => x.Equals(value)));
Assert.Equal(1000, values.Count);
Assert.Equal(1, attempts);
}

[Fact]
public void FastLazy_AllThreads_ShouldThrowException_WhenFactoryThrowsException()
{
var lazy = new FastLazy<string>(() => throw new Exception("Factory exception"));
var result = Parallel.For(0, 10, i => { Assert.Throws<Exception>(() => _ = lazy.Value); });

Assert.True(result.IsCompleted);
}
}
145 changes: 28 additions & 117 deletions src/core/Akka/Util/FastLazy.cs
Original file line number Diff line number Diff line change
Expand Up @@ -15,144 +15,55 @@ namespace Akka.Util
/// A fast, atomic lazy that only allows a single publish operation to happen,
/// but allows executions to occur concurrently.
///
/// Does not cache exceptions. Designed for use with <typeparamref name="T"/> types that are <see cref="IDisposable"/>
/// or are otherwise considered to be expensive to allocate.
///
/// Read the full explanation here: https://github.com/Aaronontheweb/FastAtomicLazy#rationale
/// Does not cache exceptions. Designed for use with <typeparam name="T"/> types that are <see cref="IDisposable"/>
/// or are otherwise considered to be expensive to allocate. Read the full explanation here: https://github.com/Aaronontheweb/FastAtomicLazy#rationale
/// </summary>
/// <typeparam name="T">TBD</typeparam>
public sealed class FastLazy<T>
{
private readonly Func<T> _producer;
private byte _created = 0;
private byte _creating = 0;
private Func<T> _producer;
private int _status = 0;
private Exception _exception;
private T _createdValue;

/// <summary>
/// Initializes a new instance of the <see cref="FastLazy{T}"/> class.
/// </summary>
/// <exception cref="ArgumentNullException">
/// This exception is thrown if the given <paramref name="producer"/> is undefined.
/// </exception>
public FastLazy(Func<T> producer)
{
if (producer == null) throw new ArgumentNullException(nameof(producer), "Producer cannot be null");
_producer = producer;
_producer = producer ?? throw new ArgumentNullException(nameof(producer));
}

/// <summary>
/// TBD
/// </summary>
/// <returns>TBD</returns>
public bool IsValueCreated => IsValueCreatedInternal();

[MethodImpl(MethodImplOptions.AggressiveInlining)]
private bool IsValueCreatedInternal()
{
return Volatile.Read(ref _created) == 1;
}

public bool IsValueCreated() => Volatile.Read(ref _status) == 2;

[MethodImpl(MethodImplOptions.AggressiveInlining)]
private bool IsValueCreationInProgress()
{
return Volatile.Read(ref _creating) == 1;
}

/// <summary>
/// TBD
/// </summary>
private bool IsExceptionThrown() => Volatile.Read(ref _exception) != null;

public T Value
{
get
{
if (IsValueCreatedInternal())
if (IsValueCreated())
return _createdValue;
if (!IsValueCreationInProgress())
{
Volatile.Write(ref _creating, 1);
_createdValue = _producer();
Volatile.Write(ref _created, 1);
}
else

if (Interlocked.CompareExchange(ref _status, 1, 0) == 0)
{
SpinWait.SpinUntil(IsValueCreatedInternal);
}
return _createdValue;
}
}
}


/// <summary>
/// A fast, atomic lazy that only allows a single publish operation to happen,
/// but allows executions to occur concurrently.
///
/// Does not cache exceptions. Designed for use with <typeparamref name="T"/> types that are <see cref="IDisposable"/>
/// or are otherwise considered to be expensive to allocate.
///
/// Read the full explanation here: https://github.com/Aaronontheweb/FastAtomicLazy#rationale
/// </summary>
/// <typeparam name="S">State type</typeparam>
/// <typeparam name="T">Value type</typeparam>
public sealed class FastLazy<S, T>
{
private readonly Func<S, T> _producer;
private byte _created = 0;
private byte _creating = 0;
private T _createdValue;
private S _state;

/// <summary>
/// Initializes a new instance of the <see cref="FastLazy{T}"/> class.
/// </summary>
/// <exception cref="ArgumentNullException">
/// This exception is thrown if the given <paramref name="producer"/> or <paramref name="state"/> is undefined.
/// </exception>
public FastLazy(Func<S, T> producer, S state)
{
if(producer == null) throw new ArgumentNullException(nameof(producer), "Producer cannot be null");
if(state == null) throw new ArgumentNullException(nameof(state), "State cannot be null");
_producer = producer;
_state = state;
}

/// <summary>
/// TBD
/// </summary>
/// <returns>TBD</returns>
public bool IsValueCreated => IsValueCreatedInternal();

[MethodImpl(MethodImplOptions.AggressiveInlining)]
private bool IsValueCreatedInternal()
{
return Volatile.Read(ref _created) == 1;
}

[MethodImpl(MethodImplOptions.AggressiveInlining)]
private bool IsValueCreationInProgress()
{
return Volatile.Read(ref _creating) == 1;
}
try
{
_createdValue = _producer();
}
catch (Exception e)
{
Volatile.Write(ref _exception, e);
throw;
}

/// <summary>
/// TBD
/// </summary>
public T Value
{
get
{
if (IsValueCreatedInternal())
return _createdValue;
if (!IsValueCreationInProgress())
{
Volatile.Write(ref _creating, 1);
_createdValue = _producer(_state);
Volatile.Write(ref _created, 1);
_state = default(S); // for reference types to make it suitable for gc
Volatile.Write(ref _status, 2);
_producer = null; // release for GC
}
else
{
SpinWait.SpinUntil(IsValueCreatedInternal);
SpinWait.SpinUntil(() => IsValueCreated() || IsExceptionThrown());
var e = Volatile.Read(ref _exception);
if (e != null)
throw e;
}
return _createdValue;
}
Expand Down