Skip to content

feat: Idempotency calculate remaining invocation available time as part of the idempotency record #363

New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Merged
Merged
Show file tree
Hide file tree
Changes from 5 commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
412 changes: 389 additions & 23 deletions docs/utilities/idempotency.md

Large diffs are not rendered by default.

Original file line number Diff line number Diff line change
Expand Up @@ -30,7 +30,7 @@
</ItemGroup>

<ItemGroup>
<PackageReference Include="Amazon.Lambda.Core" Version="2.1.0" />
<PackageReference Include="Amazon.Lambda.Core" Version="1.0.0" />
<PackageReference Include="AWSSDK.DynamoDBv2" Version="3.7.104.1" />
<PackageReference Include="JmesPath.Net" Version="1.0.308" />
</ItemGroup>
Expand Down
16 changes: 16 additions & 0 deletions libraries/src/AWS.Lambda.Powertools.Idempotency/Idempotency.cs
Original file line number Diff line number Diff line change
Expand Up @@ -14,6 +14,7 @@
*/

using System;
using Amazon.Lambda.Core;
using AWS.Lambda.Powertools.Common;
using AWS.Lambda.Powertools.Idempotency.Persistence;

Expand Down Expand Up @@ -85,6 +86,21 @@ public static void Configure(Action<IdempotencyBuilder> configurationAction)
Instance.SetPersistenceStore(builder.Store);
}

/// <summary>
/// Holds ILambdaContext
/// </summary>
public ILambdaContext LambdaContext { get; private set; }

/// <summary>
/// Can be used in a method which is not the handler to capture the Lambda context,
/// to calculate the remaining time before the invocation times out.
/// </summary>
/// <param name="context"></param>
public static void RegisterLambdaContext(ILambdaContext context)
{
Instance.LambdaContext = context;
}

/// <summary>
/// Create a builder that can be used to configure and create <see cref="Idempotency"/>
/// </summary>
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -89,7 +89,7 @@ protected internal sealed override T WrapSync<T>(Func<object[], T> target, objec

Task<T> ResultDelegate() => Task.FromResult(target(args));

var idempotencyHandler = new IdempotencyAspectHandler<T>(ResultDelegate, eventArgs.Method.Name, payload);
var idempotencyHandler = new IdempotencyAspectHandler<T>(ResultDelegate, eventArgs.Method.Name, payload,GetContext(eventArgs));
if (idempotencyHandler == null)
{
throw new Exception("Failed to create an instance of IdempotencyAspectHandler");
Expand Down Expand Up @@ -127,7 +127,7 @@ protected internal sealed override async Task<T> WrapAsync<T>(

Task<T> ResultDelegate() => target(args);

var idempotencyHandler = new IdempotencyAspectHandler<T>(ResultDelegate, eventArgs.Method.Name, payload);
var idempotencyHandler = new IdempotencyAspectHandler<T>(ResultDelegate, eventArgs.Method.Name, payload, GetContext(eventArgs));
if (idempotencyHandler == null)
{
throw new Exception("Failed to create an instance of IdempotencyAspectHandler");
Expand Down Expand Up @@ -172,4 +172,14 @@ private static bool IsPlacedOnRequestHandler(MethodBase method)
//Check if method has two arguments and the second one is of type ILambdaContext
return method.GetParameters().Length == 2 && method.GetParameters()[1].ParameterType == typeof(ILambdaContext);
}

private static ILambdaContext GetContext(AspectEventArgs args)
{
if (IsPlacedOnRequestHandler(args.Method))
{
return (ILambdaContext)args.Args[1];
}

return Idempotency.Instance.LambdaContext;
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -16,6 +16,7 @@
using System;
using System.Text.Json;
using System.Threading.Tasks;
using Amazon.Lambda.Core;
using AWS.Lambda.Powertools.Idempotency.Exceptions;
using AWS.Lambda.Powertools.Idempotency.Persistence;

Expand All @@ -35,6 +36,9 @@ internal class IdempotencyAspectHandler<T>
/// Request payload
/// </summary>
private readonly JsonDocument _data;

private readonly ILambdaContext _lambdaContext;

/// <summary>
/// Persistence store
/// </summary>
Expand All @@ -46,13 +50,16 @@ internal class IdempotencyAspectHandler<T>
/// <param name="target"></param>
/// <param name="functionName"></param>
/// <param name="payload"></param>
/// <param name="lambdaContext"></param>
public IdempotencyAspectHandler(
Func<Task<T>> target,
string functionName,
JsonDocument payload)
JsonDocument payload,
ILambdaContext lambdaContext)
{
_target = target;
_data = payload;
_lambdaContext = lambdaContext;
_persistenceStore = Idempotency.Instance.PersistenceStore;
_persistenceStore.Configure(Idempotency.Instance.IdempotencyOptions, functionName);
}
Expand Down Expand Up @@ -94,7 +101,7 @@ private async Task<T> ProcessIdempotency()
{
// We call saveInProgress first as an optimization for the most common case where no idempotent record
// already exists. If it succeeds, there's no need to call getRecord.
await _persistenceStore.SaveInProgress(_data, DateTimeOffset.UtcNow);
await _persistenceStore.SaveInProgress(_data, DateTimeOffset.UtcNow, GetRemainingTimeInMillis());
}
catch (IdempotencyItemAlreadyExistsException)
{
Expand Down Expand Up @@ -167,6 +174,10 @@ private Task<T> HandleForStatus(DataRecord record)
case DataRecord.DataRecordStatus.EXPIRED:
throw new IdempotencyInconsistentStateException("saveInProgress and getRecord return inconsistent results");
case DataRecord.DataRecordStatus.INPROGRESS:
if (record.InProgressExpiryTimestamp.HasValue && record.InProgressExpiryTimestamp.Value < DateTimeOffset.Now.ToUnixTimeMilliseconds())
{
throw new IdempotencyInconsistentStateException("Item should have been expired in-progress because it already time-outed.");
}
throw new IdempotencyAlreadyInProgressException("Execution already in progress with idempotency key: " +
record.IdempotencyKey);
case DataRecord.DataRecordStatus.COMPLETED:
Expand Down Expand Up @@ -234,4 +245,17 @@ private async Task<T> GetFunctionResponse()

return response;
}

/// <summary>
/// Tries to determine the remaining time available for the current lambda invocation.
/// Currently, it only works if the idempotent handler decorator is used or using {Idempotency#registerLambdaContext(Context)}
/// </summary>
/// <returns>the remaining time in milliseconds or empty if the context was not provided/found</returns>
private double? GetRemainingTimeInMillis() {
if (_lambdaContext != null) {
// why TotalMilliseconds? Because it must be the complete duration of the timespan expressed in milliseconds
return _lambdaContext.RemainingTime.TotalMilliseconds;
}
return null;
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -113,22 +113,31 @@ public virtual async Task SaveSuccess(JsonDocument data, object result, DateTime
/// </summary>
/// <param name="data">Payload</param>
/// <param name="now">The current date time</param>
/// <param name="remainingTimeInMs">The remaining time from lambda execution</param>
/// <exception cref="IdempotencyItemAlreadyExistsException"></exception>
public virtual async Task SaveInProgress(JsonDocument data, DateTimeOffset now)
public virtual async Task SaveInProgress(JsonDocument data, DateTimeOffset now, double? remainingTimeInMs)
{
var idempotencyKey = GetHashedIdempotencyKey(data);

if (RetrieveFromCache(idempotencyKey, now) != null)
{
throw new IdempotencyItemAlreadyExistsException();
}

long? inProgressExpirationMsTimestamp = null;
if (remainingTimeInMs.HasValue)
{
inProgressExpirationMsTimestamp = now.AddMilliseconds(remainingTimeInMs.Value).ToUnixTimeMilliseconds();
}

var record = new DataRecord(
idempotencyKey,
DataRecord.DataRecordStatus.INPROGRESS,
GetExpiryEpochSecond(now),
null,
GetHashedPayload(data)
GetHashedPayload(data),
inProgressExpirationMsTimestamp

);
await PutRecord(record, now);
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -35,27 +35,48 @@ public class DataRecord
/// <param name="expiryTimestamp">Unix timestamp of when record expires</param>
/// <param name="responseData">JSON serialized invocation results</param>
/// <param name="payloadHash">A hash representation of the entire event</param>
/// <param name="inProgressExpiryTimestamp">Unix timestamp of in-progress field for the remaining lambda execution time</param>
public DataRecord(string idempotencyKey,
DataRecordStatus status,
long expiryTimestamp,
string responseData,
string payloadHash)
string payloadHash,
long? inProgressExpiryTimestamp = null)
{
IdempotencyKey = idempotencyKey;
_status = status.ToString();
ExpiryTimestamp = expiryTimestamp;
ResponseData = responseData;
PayloadHash = payloadHash;
InProgressExpiryTimestamp = inProgressExpiryTimestamp;
}

/// <summary>
/// A hash representation of either the entire event or a specific configured subset of the event
/// </summary>
public string IdempotencyKey { get; }
/// <summary>
/// Unix timestamp of when record expires
/// Unix timestamp of when record expires.
/// This field is controlling how long the result of the idempotent
/// event is cached. It is stored in _seconds since epoch_.
/// DynamoDB's TTL mechanism is used to remove the record once the
/// expiry has been reached, and subsequent execution of the request
/// will be permitted. The user must configure this on their table.
/// </summary>
public long ExpiryTimestamp { get; }

/// <summary>
/// The in-progress field is set to the remaining lambda execution time
/// when the record is created.
/// This field is stored in _milliseconds since epoch_.
///
/// This ensures that:
/// 1/ other concurrently executing requests are blocked from starting
/// 2/ if a lambda times out, subsequent requests will be allowed again, despite
/// the fact that the idempotency record is already in the table
/// </summary>
public long? InProgressExpiryTimestamp { get; }

/// <summary>
/// JSON serialized invocation results
/// </summary>
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -50,6 +50,12 @@ public class DynamoDBPersistenceStore : BasePersistenceStore
/// Expiry attribute
/// </summary>
private readonly string _expiryAttr;

/// <summary>
/// In progress expiry attribute
/// </summary>
private readonly string _inProgressExpiryAttr;

/// <summary>
/// Status attribute
/// </summary>
Expand All @@ -75,6 +81,7 @@ public class DynamoDBPersistenceStore : BasePersistenceStore
/// <param name="staticPkValue"></param>
/// <param name="sortKeyAttr"></param>
/// <param name="expiryAttr"></param>
/// <param name="inProgressExpiryAttr"></param>
/// <param name="statusAttr"></param>
/// <param name="dataAttr"></param>
/// <param name="validationAttr"></param>
Expand All @@ -84,6 +91,7 @@ internal DynamoDBPersistenceStore(string tableName,
string staticPkValue,
string sortKeyAttr,
string expiryAttr,
string inProgressExpiryAttr,
string statusAttr,
string dataAttr,
string validationAttr,
Expand All @@ -94,6 +102,7 @@ internal DynamoDBPersistenceStore(string tableName,
_staticPkValue = staticPkValue;
_sortKeyAttr = sortKeyAttr;
_expiryAttr = expiryAttr;
_inProgressExpiryAttr = inProgressExpiryAttr;
_statusAttr = statusAttr;
_dataAttr = dataAttr;
_validationAttr = validationAttr;
Expand Down Expand Up @@ -139,8 +148,7 @@ public override async Task<DataRecord> GetRecord(string idempotencyKey)

return ItemToRecord(response.Item);
}



/// <inheritdoc />
public override async Task PutRecord(DataRecord record, DateTimeOffset now)
{
Expand All @@ -155,6 +163,13 @@ public override async Task PutRecord(DataRecord record, DateTimeOffset now)
{ _statusAttr, new AttributeValue(record.Status.ToString()) }
};

if (record.InProgressExpiryTimestamp.HasValue) {
item.Add(_inProgressExpiryAttr, new AttributeValue
{
N = record.InProgressExpiryTimestamp.Value.ToString()
});
}

if (PayloadValidationEnabled)
{
item.Add(_validationAttr, new AttributeValue(record.PayloadHash));
Expand All @@ -165,18 +180,22 @@ public override async Task PutRecord(DataRecord record, DateTimeOffset now)
var expressionAttributeNames = new Dictionary<string, string>
{
{"#id", _keyAttr},
{"#expiry", _expiryAttr}
{"#expiry", _expiryAttr},
{"#in_progress_expiry", _inProgressExpiryAttr},
{"#status", _statusAttr}
};

var request = new PutItemRequest
{
TableName = _tableName,
Item = item,
ConditionExpression = "attribute_not_exists(#id) OR #expiry < :now",
ConditionExpression = "attribute_not_exists(#id) OR #expiry < :now OR (attribute_exists(#in_progress_expiry) AND #in_progress_expiry < :now_milliseconds AND #status = :inprogress)",
ExpressionAttributeNames = expressionAttributeNames,
ExpressionAttributeValues = new Dictionary<string, AttributeValue>
{
{":now", new AttributeValue {N = now.ToUnixTimeSeconds().ToString()}}
{":now", new AttributeValue {N = now.ToUnixTimeSeconds().ToString()}},
{":now_milliseconds", new AttributeValue {N = now.ToUnixTimeMilliseconds().ToString()}},
{":inprogress", new AttributeValue {S = Enum.GetName(DataRecord.DataRecordStatus.INPROGRESS) }}
}
};
await _dynamoDbClient!.PutItemAsync(request);
Expand Down Expand Up @@ -247,12 +266,15 @@ private DataRecord ItemToRecord(Dictionary<string, AttributeValue> item)
// data and validation payload may be null
var hasDataAttribute = item.TryGetValue(_dataAttr, out var data);
var hasValidationAttribute = item.TryGetValue(_validationAttr, out var validation);
var hasInProgressExpiryAttr = item.TryGetValue(_inProgressExpiryAttr, out var inProgExp);


return new DataRecord(item[_sortKeyAttr ?? _keyAttr].S,
Enum.Parse<DataRecord.DataRecordStatus>(item[_statusAttr].S),
long.Parse(item[_expiryAttr].N),
hasDataAttribute ? data?.S : null,
hasValidationAttribute ? validation?.S : null);
hasValidationAttribute ? validation?.S : null,
hasInProgressExpiryAttr ? long.Parse(inProgExp.N) : null);
}

/// <summary>
Expand Down Expand Up @@ -311,6 +333,12 @@ public class DynamoDBPersistenceStoreBuilder
/// Expiry attribute
/// </summary>
private string _expiryAttr = "expiration";

/// <summary>
/// In progress expiry attribute
/// </summary>
private string _inProgressExpiryAttr = "in_progress_expiration";

/// <summary>
/// Status attribute
/// </summary>
Expand Down Expand Up @@ -346,7 +374,8 @@ public DynamoDBPersistenceStore Build()
_keyAttr,
_staticPkValue,
_sortKeyAttr,
_expiryAttr,
_expiryAttr,
_inProgressExpiryAttr,
_statusAttr,
_dataAttr,
_validationAttr,
Expand Down Expand Up @@ -408,6 +437,16 @@ public DynamoDBPersistenceStoreBuilder WithExpiryAttr(string expiryAttr)
_expiryAttr = expiryAttr;
return this;
}

/// <summary>
/// DynamoDB attribute name for in progress expiry timestamp (optional), by default "in_progress_expiration"
/// </summary>
/// <param name="inProgressExpiryAttr">name of the attribute in the table</param>
/// <returns>the builder instance (to chain operations)</returns>
public DynamoDBPersistenceStoreBuilder WithInProgressExpiryAttr(string inProgressExpiryAttr) {
_inProgressExpiryAttr = inProgressExpiryAttr;
return this;
}

/// <summary>
/// DynamoDB attribute name for status (optional), by default "status"
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -35,7 +35,13 @@ public interface IPersistenceStore
Task<DataRecord> GetRecord(string idempotencyKey);

/// <summary>
/// Add a DataRecord to persistence store if it does not already exist with that key
/// Add a DataRecord to persistence store if it does not already exist with that key.
/// Stores the given idempotency record in the DDB store. If there
/// is an existing record that has expired - either due to the
/// cache expiry or due to the in_progress_expiry - the record
/// will be overwritten and the idempotent operation can continue.
/// Note: This method writes only expiry and status information - not
/// the results of the operation itself.
/// </summary>
/// <param name="record">record DataRecord instance</param>
/// <param name="now"></param>
Expand Down
Loading