diff --git a/docs/utilities/idempotency.md b/docs/utilities/idempotency.md index 375dae94..c1a675e0 100644 --- a/docs/utilities/idempotency.md +++ b/docs/utilities/idempotency.md @@ -9,6 +9,14 @@ description: Utility The idempotency utility provides a simple solution to convert your Lambda functions into idempotent operations which are safe to retry. +## Key features + +* Prevent Lambda handler function from executing more than once on the same event payload during a time window +* Ensure Lambda handler returns the same result when called with the same payload +* Select a subset of the event as the idempotency key using [JMESPath](https://jmespath.org/) expressions +* Set a time window in which records with the same payload should be considered duplicates +* Expires in-progress executions if the Lambda function times out halfway through + ## Terminology The property of idempotency means that an operation does not cause additional side effects if it is called more than once with the same input parameters. @@ -17,12 +25,31 @@ The property of idempotency means that an operation does not cause additional si **Idempotency key** is a hash representation of either the entire event or a specific configured subset of the event, and invocation results are **JSON serialized** and stored in your persistence storage layer. -## Key features +**Idempotency record** is the data representation of an idempotent request saved in your preferred storage layer. We use it to coordinate whether a request is idempotent, whether it's still valid or expired based on timestamps, etc. + +
+```mermaid +classDiagram + direction LR + class DataRecord { + string IdempotencyKey + DataRecordStatus Status + long ExpiryTimestamp + long InProgressExpiryTimestamp + string ResponseData + string PayloadHash + } + class Status { + <> + INPROGRESS + COMPLETED + EXPIRED + } + DataRecord -- Status +``` +Idempotency record representation +
-* Prevent Lambda handler function from executing more than once on the same event payload during a time window -* Ensure Lambda handler returns the same result when called with the same payload -* Select a subset of the event as the idempotency key using [JMESPath](https://jmespath.org/) expressions -* Set a time window in which records with the same payload should be considered duplicates ## Getting started @@ -40,6 +67,13 @@ Or via the .NET Core command line interface: dotnet add package AWS.Lambda.Powertools.Idempotency ``` +### IAM Permissions + +Your Lambda function IAM Role must have `dynamodb:GetItem`, `dynamodb:PutItem`, `dynamodb:UpdateItem` and `dynamodb:DeleteItem` IAM permissions before using this feature. + +???+ note +If you're using our example [AWS Serverless Application Model (SAM)](#required-resources), [AWS Cloud Development Kit (CDK)](#required-resources), or [Terraform](#required-resources) it already adds the required permissions. + ### Required resources Before getting started, you need to create a persistent storage layer where the idempotency utility can store its state - your Lambda functions will need read and write access to it. @@ -106,7 +140,7 @@ You can quickly start by configuring `Idempotency` and using it with the `Idempo !!! warning "Important" Initialization and configuration of the `Idempotency` must be performed outside the handler, preferably in the constructor. - ```csharp + ```csharp hl_lines="4 7" public class Function { public Function() @@ -133,7 +167,7 @@ When using `Idempotent` attribute on another method, you must tell which paramet !!! info "The parameter must be serializable in JSON. We use `System.Text.Json` internally to (de)serialize objects" - ```csharp + ```csharp hl_lines="4 13-14" public class Function { public Function() @@ -143,12 +177,12 @@ When using `Idempotent` attribute on another method, you must tell which paramet public Task FunctionHandler(string input, ILambdaContext context) { - dummpy("hello", "world") + MyInternalMethod("hello", "world") return Task.FromResult(input.ToUpper()); } [Idempotent] - private string dummy(string argOne, [IdempotencyKey] string argTwo) { + private string MyInternalMethod(string argOne, [IdempotencyKey] string argTwo) { return "something"; } } @@ -167,19 +201,102 @@ In this example, we have a Lambda handler that creates a payment for a user subs Imagine the function executes successfully, but the client never receives the response due to a connection issue. It is safe to retry in this instance, as the idempotent decorator will return a previously saved response. -!!! warning "Warning: Idempotency for JSON payloads" +**What we want here** is to instruct Idempotency to use `user_id` and `product_id` fields from our incoming payload as our idempotency key. +If we were to treat the entire request as our idempotency key, a simple HTTP header change would cause our customer to be charged twice. + +???+ tip "Deserializing JSON strings in payloads for increased accuracy." The payload extracted by the `EventKeyJmesPath` is treated as a string by default, so will be sensitive to differences in whitespace even when the JSON payload itself is identical. To alter this behaviour, you can use the JMESPath built-in function `powertools_json()` to treat the payload as a JSON object rather than a string. - ```csharp +=== "Payment function" + + ```csharp hl_lines="3" Idempotency.Configure(builder => builder .WithOptions(optionsBuilder => - optionsBuilder.WithEventKeyJmesPath("powertools_json(Body).address")) + optionsBuilder.WithEventKeyJmesPath("powertools_json(Body).[\"user_id\", \"product_id\"]")) .UseDynamoDb("idempotency_table")); ``` +=== "Sample event" + + ```json hl_lines="27" + { + "version": "2.0", + "routeKey": "ANY /createpayment", + "rawPath": "/createpayment", + "rawQueryString": "", + "headers": { + "Header1": "value1", + "Header2": "value2" + }, + "requestContext": { + "accountId": "123456789012", + "apiId": "api-id", + "domainName": "id.execute-api.us-east-1.amazonaws.com", + "domainPrefix": "id", + "http": { + "method": "POST", + "path": "/createpayment", + "protocol": "HTTP/1.1", + "sourceIp": "ip", + "userAgent": "agent" + }, + "requestId": "id", + "routeKey": "ANY /createpayment", + "stage": "$default", + "time": "10/Feb/2021:13:40:43 +0000", + "timeEpoch": 1612964443723 + }, + "body": "{\"user_id\":\"xyz\",\"product_id\":\"123456789\"}", + "isBase64Encoded": false + } + ``` + +### Lambda timeouts + +???+ note +This is automatically done when you decorate your Lambda handler with [Idempotent attribute](#idempotent-attribute). + +To prevent against extended failed retries when a [Lambda function times out](https://aws.amazon.com/premiumsupport/knowledge-center/lambda-verify-invocation-timeouts/){target="_blank"}, +Powertools for AWS Lambda (.NET) calculates and includes the remaining invocation available time as part of the idempotency record. + +???+ example +If a second invocation happens **after** this timestamp, and the record is marked as `INPROGRESS`, we will execute the invocation again as if it was in the `EXPIRED` state (e.g, `Expired` field elapsed). + + This means that if an invocation expired during execution, it will be quickly executed again on the next retry. + +???+ important +If you are only using the [Idempotent attribute](#Idempotent-attribute-on-another-method) to guard isolated parts of your code, +you must use `RegisterLambdaContext` available in the `Idempotency` static class to benefit from this protection. + +Here is an example on how you register the Lambda context in your handler: + +=== "Registering the Lambda context" + + ```csharp hl_lines="9" title="Registering the Lambda context" + public class Function + { + public Function() + { + Idempotency.Configure(builder => builder.UseDynamoDb("idempotency_table")); + } + + public Task FunctionHandler(string input, ILambdaContext context) + { + Idempotency.RegisterLambdaContext(context); + MyInternalMethod("hello", "world") + return Task.FromResult(input.ToUpper()); + } + + [Idempotent] + private string MyInternalMethod(string argOne, [IdempotencyKey] string argTwo) { + return "something"; + } + } + ``` + ### Handling exceptions If you are using the `Idempotent` attribute on your Lambda handler or any other method, any unhandled exceptions that are thrown during the code execution will cause **the record in the persistence layer to be deleted**. @@ -190,6 +307,254 @@ This means that new invocations will execute your code again despite having the As this happens outside the scope of your decorated function, you are not able to catch it. +
+```mermaid +sequenceDiagram + participant Client + participant Lambda + participant Persistence Layer + Client->>Lambda: Invoke (event) + Lambda->>Persistence Layer: Get or set (id=event.search(payload)) + activate Persistence Layer + Note right of Persistence Layer: Locked during this time. Prevents multiple
Lambda invocations with the same
payload running concurrently. + Lambda--xLambda: Call handler (event).
Raises exception + Lambda->>Persistence Layer: Delete record (id=event.search(payload)) + deactivate Persistence Layer + Lambda-->>Client: Return error response +``` +Idempotent sequence exception +
+ +If you are using `Idempotent` attribute on another method, any unhandled exceptions that are raised _inside_ the decorated function will cause the record in the persistence layer to be deleted, and allow the function to be executed again if retried. + +If an Exception is raised _outside_ the scope of the decorated method and after your method has been called, the persistent record will not be affected. In this case, idempotency will be maintained for your decorated function. Example: + +=== "Handling exceptions" + + ```csharp hl_lines="2-4 8-10" title="Exception not affecting idempotency record sample" + public class Function + { + public Function() + { + Idempotency.Configure(builder => builder.UseDynamoDb("idempotency_table")); + } + + public Task FunctionHandler(string input, ILambdaContext context) + { + Idempotency.RegisterLambdaContext(context); + // If an exception is thrown here, no idempotent record will ever get created as the + // idempotent method does not get called + + MyInternalMethod("hello", "world") + + // This exception will not cause the idempotent record to be deleted, since it + // happens after the decorated method has been successfully called + throw new Exception(); + } + + [Idempotent] + private string MyInternalMethod(string argOne, [IdempotencyKey] string argTwo) { + return "something"; + } + } + ``` + +### Idempotency request flow + +The following sequence diagrams explain how the Idempotency feature behaves under different scenarios. + +#### Successful request + +
+```mermaid +sequenceDiagram + participant Client + participant Lambda + participant Persistence Layer + alt initial request + Client->>Lambda: Invoke (event) + Lambda->>Persistence Layer: Get or set idempotency_key=hash(payload) + activate Persistence Layer + Note over Lambda,Persistence Layer: Set record status to INPROGRESS.
Prevents concurrent invocations
with the same payload + Lambda-->>Lambda: Call your function + Lambda->>Persistence Layer: Update record with result + deactivate Persistence Layer + Persistence Layer-->>Persistence Layer: Update record + Note over Lambda,Persistence Layer: Set record status to COMPLETE.
New invocations with the same payload
now return the same result + Lambda-->>Client: Response sent to client + else retried request + Client->>Lambda: Invoke (event) + Lambda->>Persistence Layer: Get or set idempotency_key=hash(payload) + activate Persistence Layer + Persistence Layer-->>Lambda: Already exists in persistence layer. + deactivate Persistence Layer + Note over Lambda,Persistence Layer: Record status is COMPLETE and not expired + Lambda-->>Client: Same response sent to client + end +``` +Idempotent successful request +
+ +#### Successful request with cache enabled + +!!! note "[In-memory cache is disabled by default](#using-in-memory-cache)." + +
+```mermaid +sequenceDiagram + participant Client + participant Lambda + participant Persistence Layer + alt initial request + Client->>Lambda: Invoke (event) + Lambda->>Persistence Layer: Get or set idempotency_key=hash(payload) + activate Persistence Layer + Note over Lambda,Persistence Layer: Set record status to INPROGRESS.
Prevents concurrent invocations
with the same payload + Lambda-->>Lambda: Call your function + Lambda->>Persistence Layer: Update record with result + deactivate Persistence Layer + Persistence Layer-->>Persistence Layer: Update record + Note over Lambda,Persistence Layer: Set record status to COMPLETE.
New invocations with the same payload
now return the same result + Lambda-->>Lambda: Save record and result in memory + Lambda-->>Client: Response sent to client + else retried request + Client->>Lambda: Invoke (event) + Lambda-->>Lambda: Get idempotency_key=hash(payload) + Note over Lambda,Persistence Layer: Record status is COMPLETE and not expired + Lambda-->>Client: Same response sent to client + end +``` +Idempotent successful request cached +
+ +#### Expired idempotency records + +
+```mermaid +sequenceDiagram + participant Client + participant Lambda + participant Persistence Layer + alt initial request + Client->>Lambda: Invoke (event) + Lambda->>Persistence Layer: Get or set idempotency_key=hash(payload) + activate Persistence Layer + Note over Lambda,Persistence Layer: Set record status to INPROGRESS.
Prevents concurrent invocations
with the same payload + Lambda-->>Lambda: Call your function + Lambda->>Persistence Layer: Update record with result + deactivate Persistence Layer + Persistence Layer-->>Persistence Layer: Update record + Note over Lambda,Persistence Layer: Set record status to COMPLETE.
New invocations with the same payload
now return the same result + Lambda-->>Client: Response sent to client + else retried request + Client->>Lambda: Invoke (event) + Lambda->>Persistence Layer: Get or set idempotency_key=hash(payload) + activate Persistence Layer + Persistence Layer-->>Lambda: Already exists in persistence layer. + deactivate Persistence Layer + Note over Lambda,Persistence Layer: Record status is COMPLETE but expired hours ago + loop Repeat initial request process + Note over Lambda,Persistence Layer: 1. Set record to INPROGRESS,
2. Call your function,
3. Set record to COMPLETE + end + Lambda-->>Client: Same response sent to client + end +``` +Previous Idempotent request expired +
+ +#### Concurrent identical in-flight requests + +
+```mermaid +sequenceDiagram + participant Client + participant Lambda + participant Persistence Layer + Client->>Lambda: Invoke (event) + Lambda->>Persistence Layer: Get or set idempotency_key=hash(payload) + activate Persistence Layer + Note over Lambda,Persistence Layer: Set record status to INPROGRESS.
Prevents concurrent invocations
with the same payload + par Second request + Client->>Lambda: Invoke (event) + Lambda->>Persistence Layer: Get or set idempotency_key=hash(payload) + Lambda--xLambda: IdempotencyAlreadyInProgressError + Lambda->>Client: Error sent to client if unhandled + end + Lambda-->>Lambda: Call your function + Lambda->>Persistence Layer: Update record with result + deactivate Persistence Layer + Persistence Layer-->>Persistence Layer: Update record + Note over Lambda,Persistence Layer: Set record status to COMPLETE.
New invocations with the same payload
now return the same result + Lambda-->>Client: Response sent to client +``` +Concurrent identical in-flight requests +
+ +#### Lambda request timeout + +
+```mermaid +sequenceDiagram + participant Client + participant Lambda + participant Persistence Layer + alt initial request + Client->>Lambda: Invoke (event) + Lambda->>Persistence Layer: Get or set idempotency_key=hash(payload) + activate Persistence Layer + Note over Lambda,Persistence Layer: Set record status to INPROGRESS.
Prevents concurrent invocations
with the same payload + Lambda-->>Lambda: Call your function + Note right of Lambda: Time out + Lambda--xLambda: Time out error + Lambda-->>Client: Return error response + deactivate Persistence Layer + else retry after Lambda timeout elapses + Client->>Lambda: Invoke (event) + Lambda->>Persistence Layer: Get or set idempotency_key=hash(payload) + activate Persistence Layer + Note over Lambda,Persistence Layer: Set record status to INPROGRESS.
Reset in_progress_expiry attribute + Lambda-->>Lambda: Call your function + Lambda->>Persistence Layer: Update record with result + deactivate Persistence Layer + Persistence Layer-->>Persistence Layer: Update record + Lambda-->>Client: Response sent to client + end +``` +Idempotent request during and after Lambda timeouts +
+ +#### Optional idempotency key + +
+```mermaid +sequenceDiagram + participant Client + participant Lambda + participant Persistence Layer + alt request with idempotency key + Client->>Lambda: Invoke (event) + Lambda->>Persistence Layer: Get or set idempotency_key=hash(payload) + activate Persistence Layer + Note over Lambda,Persistence Layer: Set record status to INPROGRESS.
Prevents concurrent invocations
with the same payload + Lambda-->>Lambda: Call your function + Lambda->>Persistence Layer: Update record with result + deactivate Persistence Layer + Persistence Layer-->>Persistence Layer: Update record + Note over Lambda,Persistence Layer: Set record status to COMPLETE.
New invocations with the same payload
now return the same result + Lambda-->>Client: Response sent to client + else request(s) without idempotency key + Client->>Lambda: Invoke (event) + Note over Lambda: Idempotency key is missing + Note over Persistence Layer: Skips any operation to fetch, update, and delete + Lambda-->>Lambda: Call your function + Lambda-->>Client: Response sent to client + end +``` +Optional idempotency key +
+ +## Advanced + ### Persistence stores #### DynamoDBPersistenceStore @@ -205,23 +570,24 @@ new DynamoDBPersistenceStoreBuilder() .WithStatusAttr("current_status") .WithDataAttr("result_data") .WithValidationAttr("validation_key") + .WithInProgressExpiryAttr("in_progress_expires_at") .Build() ``` When using DynamoDB as a persistence layer, you can alter the attribute names by passing these parameters when initializing the persistence layer: -| Parameter | Required | Default | Description | -|--------------------|----------|--------------------------------------|--------------------------------------------------------------------------------------------------------| -| **TableName** | Y | | Table name to store state | -| **KeyAttr** | | `id` | Partition key of the table. Hashed representation of the payload (unless **SortKeyAttr** is specified) | -| **ExpiryAttr** | | `expiration` | Unix timestamp of when record expires | -| **StatusAttr** | | `status` | Stores status of the Lambda execution during and after invocation | -| **DataAttr** | | `data` | Stores results of successfully idempotent methods | -| **ValidationAttr** | | `validation` | Hashed representation of the parts of the event used for validation | -| **SortKeyAttr** | | | Sort key of the table (if table is configured with a sort key). | -| **StaticPkValue** | | `idempotency#{LAMBDA_FUNCTION_NAME}` | Static value to use as the partition key. Only used when **SortKeyAttr** is set. | +| Parameter | Required | Default | Description | +|----------------------------|----------|--------------------------------------|--------------------------------------------------------------------------------------------------------| +| **TableName** | Y | | Table name to store state | +| **KeyAttr** | | `id` | Partition key of the table. Hashed representation of the payload (unless **SortKeyAttr** is specified) | +| **ExpiryAttr** | | `expiration` | Unix timestamp of when record expires | +| **InProgressExpiryAttr** | | `in_progress_expiration` | Unix timestamp of when record expires while in progress (in case of the invocation times out) | +| **StatusAttr** | | `status` | Stores status of the Lambda execution during and after invocation | +| **DataAttr** | | `data` | Stores results of successfully idempotent methods | +| **ValidationAttr** | | `validation` | Hashed representation of the parts of the event used for validation | +| **SortKeyAttr** | | | Sort key of the table (if table is configured with a sort key). | +| **StaticPkValue** | | `idempotency#{LAMBDA_FUNCTION_NAME}` | Static value to use as the partition key. Only used when **SortKeyAttr** is set. | -## Advanced ### Customizing the default behavior diff --git a/libraries/src/AWS.Lambda.Powertools.Idempotency/AWS.Lambda.Powertools.Idempotency.csproj b/libraries/src/AWS.Lambda.Powertools.Idempotency/AWS.Lambda.Powertools.Idempotency.csproj index 7a8b58e9..a4ff4eb2 100644 --- a/libraries/src/AWS.Lambda.Powertools.Idempotency/AWS.Lambda.Powertools.Idempotency.csproj +++ b/libraries/src/AWS.Lambda.Powertools.Idempotency/AWS.Lambda.Powertools.Idempotency.csproj @@ -30,7 +30,7 @@ - + diff --git a/libraries/src/AWS.Lambda.Powertools.Idempotency/Idempotency.cs b/libraries/src/AWS.Lambda.Powertools.Idempotency/Idempotency.cs index d81fb428..e8de9f52 100644 --- a/libraries/src/AWS.Lambda.Powertools.Idempotency/Idempotency.cs +++ b/libraries/src/AWS.Lambda.Powertools.Idempotency/Idempotency.cs @@ -14,6 +14,7 @@ */ using System; +using Amazon.Lambda.Core; using AWS.Lambda.Powertools.Common; using AWS.Lambda.Powertools.Idempotency.Persistence; @@ -85,6 +86,21 @@ public static void Configure(Action configurationAction) Instance.SetPersistenceStore(builder.Store); } + /// + /// Holds ILambdaContext + /// + public ILambdaContext LambdaContext { get; private set; } + + /// + /// Can be used in a method which is not the handler to capture the Lambda context, + /// to calculate the remaining time before the invocation times out. + /// + /// + public static void RegisterLambdaContext(ILambdaContext context) + { + Instance.LambdaContext = context; + } + /// /// Create a builder that can be used to configure and create /// diff --git a/libraries/src/AWS.Lambda.Powertools.Idempotency/IdempotentAttribute.cs b/libraries/src/AWS.Lambda.Powertools.Idempotency/IdempotentAttribute.cs index d211a7cd..9f1683fa 100644 --- a/libraries/src/AWS.Lambda.Powertools.Idempotency/IdempotentAttribute.cs +++ b/libraries/src/AWS.Lambda.Powertools.Idempotency/IdempotentAttribute.cs @@ -89,7 +89,7 @@ protected internal sealed override T WrapSync(Func target, objec Task ResultDelegate() => Task.FromResult(target(args)); - var idempotencyHandler = new IdempotencyAspectHandler(ResultDelegate, eventArgs.Method.Name, payload); + var idempotencyHandler = new IdempotencyAspectHandler(ResultDelegate, eventArgs.Method.Name, payload,GetContext(eventArgs)); if (idempotencyHandler == null) { throw new Exception("Failed to create an instance of IdempotencyAspectHandler"); @@ -127,7 +127,7 @@ protected internal sealed override async Task WrapAsync( Task ResultDelegate() => target(args); - var idempotencyHandler = new IdempotencyAspectHandler(ResultDelegate, eventArgs.Method.Name, payload); + var idempotencyHandler = new IdempotencyAspectHandler(ResultDelegate, eventArgs.Method.Name, payload, GetContext(eventArgs)); if (idempotencyHandler == null) { throw new Exception("Failed to create an instance of IdempotencyAspectHandler"); @@ -172,4 +172,14 @@ private static bool IsPlacedOnRequestHandler(MethodBase method) //Check if method has two arguments and the second one is of type ILambdaContext return method.GetParameters().Length == 2 && method.GetParameters()[1].ParameterType == typeof(ILambdaContext); } + + private static ILambdaContext GetContext(AspectEventArgs args) + { + if (IsPlacedOnRequestHandler(args.Method)) + { + return (ILambdaContext)args.Args[1]; + } + + return Idempotency.Instance.LambdaContext; + } } \ No newline at end of file diff --git a/libraries/src/AWS.Lambda.Powertools.Idempotency/Internal/IdempotencyAspectHandler.cs b/libraries/src/AWS.Lambda.Powertools.Idempotency/Internal/IdempotencyAspectHandler.cs index 0124b3b6..a78ce309 100644 --- a/libraries/src/AWS.Lambda.Powertools.Idempotency/Internal/IdempotencyAspectHandler.cs +++ b/libraries/src/AWS.Lambda.Powertools.Idempotency/Internal/IdempotencyAspectHandler.cs @@ -16,6 +16,7 @@ using System; using System.Text.Json; using System.Threading.Tasks; +using Amazon.Lambda.Core; using AWS.Lambda.Powertools.Idempotency.Exceptions; using AWS.Lambda.Powertools.Idempotency.Persistence; @@ -35,6 +36,9 @@ internal class IdempotencyAspectHandler /// Request payload /// private readonly JsonDocument _data; + + private readonly ILambdaContext _lambdaContext; + /// /// Persistence store /// @@ -46,13 +50,16 @@ internal class IdempotencyAspectHandler /// /// /// + /// public IdempotencyAspectHandler( Func> target, string functionName, - JsonDocument payload) + JsonDocument payload, + ILambdaContext lambdaContext) { _target = target; _data = payload; + _lambdaContext = lambdaContext; _persistenceStore = Idempotency.Instance.PersistenceStore; _persistenceStore.Configure(Idempotency.Instance.IdempotencyOptions, functionName); } @@ -94,7 +101,7 @@ private async Task ProcessIdempotency() { // We call saveInProgress first as an optimization for the most common case where no idempotent record // already exists. If it succeeds, there's no need to call getRecord. - await _persistenceStore.SaveInProgress(_data, DateTimeOffset.UtcNow); + await _persistenceStore.SaveInProgress(_data, DateTimeOffset.UtcNow, GetRemainingTimeInMillis()); } catch (IdempotencyItemAlreadyExistsException) { @@ -167,6 +174,10 @@ private Task HandleForStatus(DataRecord record) case DataRecord.DataRecordStatus.EXPIRED: throw new IdempotencyInconsistentStateException("saveInProgress and getRecord return inconsistent results"); case DataRecord.DataRecordStatus.INPROGRESS: + if (record.InProgressExpiryTimestamp.HasValue && record.InProgressExpiryTimestamp.Value < DateTimeOffset.Now.ToUnixTimeMilliseconds()) + { + throw new IdempotencyInconsistentStateException("Item should have been expired in-progress because it already time-outed."); + } throw new IdempotencyAlreadyInProgressException("Execution already in progress with idempotency key: " + record.IdempotencyKey); case DataRecord.DataRecordStatus.COMPLETED: @@ -234,4 +245,17 @@ private async Task GetFunctionResponse() return response; } + + /// + /// Tries to determine the remaining time available for the current lambda invocation. + /// Currently, it only works if the idempotent handler decorator is used or using {Idempotency#registerLambdaContext(Context)} + /// + /// the remaining time in milliseconds or empty if the context was not provided/found + private double? GetRemainingTimeInMillis() { + if (_lambdaContext != null) { + // why TotalMilliseconds? Because it must be the complete duration of the timespan expressed in milliseconds + return _lambdaContext.RemainingTime.TotalMilliseconds; + } + return null; + } } \ No newline at end of file diff --git a/libraries/src/AWS.Lambda.Powertools.Idempotency/Persistence/BasePersistenceStore.cs b/libraries/src/AWS.Lambda.Powertools.Idempotency/Persistence/BasePersistenceStore.cs index a20acf07..27ae02f6 100644 --- a/libraries/src/AWS.Lambda.Powertools.Idempotency/Persistence/BasePersistenceStore.cs +++ b/libraries/src/AWS.Lambda.Powertools.Idempotency/Persistence/BasePersistenceStore.cs @@ -113,8 +113,9 @@ public virtual async Task SaveSuccess(JsonDocument data, object result, DateTime /// /// Payload /// The current date time + /// The remaining time from lambda execution /// - public virtual async Task SaveInProgress(JsonDocument data, DateTimeOffset now) + public virtual async Task SaveInProgress(JsonDocument data, DateTimeOffset now, double? remainingTimeInMs) { var idempotencyKey = GetHashedIdempotencyKey(data); @@ -122,13 +123,21 @@ public virtual async Task SaveInProgress(JsonDocument data, DateTimeOffset now) { throw new IdempotencyItemAlreadyExistsException(); } + + long? inProgressExpirationMsTimestamp = null; + if (remainingTimeInMs.HasValue) + { + inProgressExpirationMsTimestamp = now.AddMilliseconds(remainingTimeInMs.Value).ToUnixTimeMilliseconds(); + } var record = new DataRecord( idempotencyKey, DataRecord.DataRecordStatus.INPROGRESS, GetExpiryEpochSecond(now), null, - GetHashedPayload(data) + GetHashedPayload(data), + inProgressExpirationMsTimestamp + ); await PutRecord(record, now); } diff --git a/libraries/src/AWS.Lambda.Powertools.Idempotency/Persistence/DataRecord.cs b/libraries/src/AWS.Lambda.Powertools.Idempotency/Persistence/DataRecord.cs index c35d9626..52eb1638 100644 --- a/libraries/src/AWS.Lambda.Powertools.Idempotency/Persistence/DataRecord.cs +++ b/libraries/src/AWS.Lambda.Powertools.Idempotency/Persistence/DataRecord.cs @@ -35,17 +35,20 @@ public class DataRecord /// Unix timestamp of when record expires /// JSON serialized invocation results /// A hash representation of the entire event + /// Unix timestamp of in-progress field for the remaining lambda execution time public DataRecord(string idempotencyKey, DataRecordStatus status, long expiryTimestamp, string responseData, - string payloadHash) + string payloadHash, + long? inProgressExpiryTimestamp = null) { IdempotencyKey = idempotencyKey; _status = status.ToString(); ExpiryTimestamp = expiryTimestamp; ResponseData = responseData; PayloadHash = payloadHash; + InProgressExpiryTimestamp = inProgressExpiryTimestamp; } /// @@ -53,9 +56,27 @@ public DataRecord(string idempotencyKey, /// public string IdempotencyKey { get; } /// - /// Unix timestamp of when record expires + /// Unix timestamp of when record expires. + /// This field is controlling how long the result of the idempotent + /// event is cached. It is stored in _seconds since epoch_. + /// DynamoDB's TTL mechanism is used to remove the record once the + /// expiry has been reached, and subsequent execution of the request + /// will be permitted. The user must configure this on their table. /// public long ExpiryTimestamp { get; } + + /// + /// The in-progress field is set to the remaining lambda execution time + /// when the record is created. + /// This field is stored in _milliseconds since epoch_. + /// + /// This ensures that: + /// 1/ other concurrently executing requests are blocked from starting + /// 2/ if a lambda times out, subsequent requests will be allowed again, despite + /// the fact that the idempotency record is already in the table + /// + public long? InProgressExpiryTimestamp { get; } + /// /// JSON serialized invocation results /// diff --git a/libraries/src/AWS.Lambda.Powertools.Idempotency/Persistence/DynamoDBPersistenceStore.cs b/libraries/src/AWS.Lambda.Powertools.Idempotency/Persistence/DynamoDBPersistenceStore.cs index bec8f308..9b8cf500 100644 --- a/libraries/src/AWS.Lambda.Powertools.Idempotency/Persistence/DynamoDBPersistenceStore.cs +++ b/libraries/src/AWS.Lambda.Powertools.Idempotency/Persistence/DynamoDBPersistenceStore.cs @@ -50,6 +50,12 @@ public class DynamoDBPersistenceStore : BasePersistenceStore /// Expiry attribute /// private readonly string _expiryAttr; + + /// + /// In progress expiry attribute + /// + private readonly string _inProgressExpiryAttr; + /// /// Status attribute /// @@ -75,6 +81,7 @@ public class DynamoDBPersistenceStore : BasePersistenceStore /// /// /// + /// /// /// /// @@ -84,6 +91,7 @@ internal DynamoDBPersistenceStore(string tableName, string staticPkValue, string sortKeyAttr, string expiryAttr, + string inProgressExpiryAttr, string statusAttr, string dataAttr, string validationAttr, @@ -94,6 +102,7 @@ internal DynamoDBPersistenceStore(string tableName, _staticPkValue = staticPkValue; _sortKeyAttr = sortKeyAttr; _expiryAttr = expiryAttr; + _inProgressExpiryAttr = inProgressExpiryAttr; _statusAttr = statusAttr; _dataAttr = dataAttr; _validationAttr = validationAttr; @@ -139,8 +148,7 @@ public override async Task GetRecord(string idempotencyKey) return ItemToRecord(response.Item); } - - + /// public override async Task PutRecord(DataRecord record, DateTimeOffset now) { @@ -155,6 +163,13 @@ public override async Task PutRecord(DataRecord record, DateTimeOffset now) { _statusAttr, new AttributeValue(record.Status.ToString()) } }; + if (record.InProgressExpiryTimestamp.HasValue) { + item.Add(_inProgressExpiryAttr, new AttributeValue + { + N = record.InProgressExpiryTimestamp.Value.ToString() + }); + } + if (PayloadValidationEnabled) { item.Add(_validationAttr, new AttributeValue(record.PayloadHash)); @@ -165,18 +180,22 @@ public override async Task PutRecord(DataRecord record, DateTimeOffset now) var expressionAttributeNames = new Dictionary { {"#id", _keyAttr}, - {"#expiry", _expiryAttr} + {"#expiry", _expiryAttr}, + {"#in_progress_expiry", _inProgressExpiryAttr}, + {"#status", _statusAttr} }; var request = new PutItemRequest { TableName = _tableName, Item = item, - ConditionExpression = "attribute_not_exists(#id) OR #expiry < :now", + ConditionExpression = "attribute_not_exists(#id) OR #expiry < :now OR (attribute_exists(#in_progress_expiry) AND #in_progress_expiry < :now_milliseconds AND #status = :inprogress)", ExpressionAttributeNames = expressionAttributeNames, ExpressionAttributeValues = new Dictionary { - {":now", new AttributeValue {N = now.ToUnixTimeSeconds().ToString()}} + {":now", new AttributeValue {N = now.ToUnixTimeSeconds().ToString()}}, + {":now_milliseconds", new AttributeValue {N = now.ToUnixTimeMilliseconds().ToString()}}, + {":inprogress", new AttributeValue {S = Enum.GetName(DataRecord.DataRecordStatus.INPROGRESS) }} } }; await _dynamoDbClient!.PutItemAsync(request); @@ -247,12 +266,15 @@ private DataRecord ItemToRecord(Dictionary item) // data and validation payload may be null var hasDataAttribute = item.TryGetValue(_dataAttr, out var data); var hasValidationAttribute = item.TryGetValue(_validationAttr, out var validation); + var hasInProgressExpiryAttr = item.TryGetValue(_inProgressExpiryAttr, out var inProgExp); + return new DataRecord(item[_sortKeyAttr ?? _keyAttr].S, Enum.Parse(item[_statusAttr].S), long.Parse(item[_expiryAttr].N), hasDataAttribute ? data?.S : null, - hasValidationAttribute ? validation?.S : null); + hasValidationAttribute ? validation?.S : null, + hasInProgressExpiryAttr ? long.Parse(inProgExp.N) : null); } /// @@ -311,6 +333,12 @@ public class DynamoDBPersistenceStoreBuilder /// Expiry attribute /// private string _expiryAttr = "expiration"; + + /// + /// In progress expiry attribute + /// + private string _inProgressExpiryAttr = "in_progress_expiration"; + /// /// Status attribute /// @@ -346,7 +374,8 @@ public DynamoDBPersistenceStore Build() _keyAttr, _staticPkValue, _sortKeyAttr, - _expiryAttr, + _expiryAttr, + _inProgressExpiryAttr, _statusAttr, _dataAttr, _validationAttr, @@ -408,6 +437,16 @@ public DynamoDBPersistenceStoreBuilder WithExpiryAttr(string expiryAttr) _expiryAttr = expiryAttr; return this; } + + /// + /// DynamoDB attribute name for in progress expiry timestamp (optional), by default "in_progress_expiration" + /// + /// name of the attribute in the table + /// the builder instance (to chain operations) + public DynamoDBPersistenceStoreBuilder WithInProgressExpiryAttr(string inProgressExpiryAttr) { + _inProgressExpiryAttr = inProgressExpiryAttr; + return this; + } /// /// DynamoDB attribute name for status (optional), by default "status" diff --git a/libraries/src/AWS.Lambda.Powertools.Idempotency/Persistence/IPersistenceStore.cs b/libraries/src/AWS.Lambda.Powertools.Idempotency/Persistence/IPersistenceStore.cs index 7e00371c..c1fd8e82 100644 --- a/libraries/src/AWS.Lambda.Powertools.Idempotency/Persistence/IPersistenceStore.cs +++ b/libraries/src/AWS.Lambda.Powertools.Idempotency/Persistence/IPersistenceStore.cs @@ -35,7 +35,13 @@ public interface IPersistenceStore Task GetRecord(string idempotencyKey); /// - /// Add a DataRecord to persistence store if it does not already exist with that key + /// Add a DataRecord to persistence store if it does not already exist with that key. + /// Stores the given idempotency record in the DDB store. If there + /// is an existing record that has expired - either due to the + /// cache expiry or due to the in_progress_expiry - the record + /// will be overwritten and the idempotent operation can continue. + /// Note: This method writes only expiry and status information - not + /// the results of the operation itself. /// /// record DataRecord instance /// diff --git a/libraries/tests/AWS.Lambda.Powertools.Idempotency.Tests/Handlers/IdempotencyInternalFunction.cs b/libraries/tests/AWS.Lambda.Powertools.Idempotency.Tests/Handlers/IdempotencyInternalFunction.cs index 8ce1c2b3..a9ffcfe5 100644 --- a/libraries/tests/AWS.Lambda.Powertools.Idempotency.Tests/Handlers/IdempotencyInternalFunction.cs +++ b/libraries/tests/AWS.Lambda.Powertools.Idempotency.Tests/Handlers/IdempotencyInternalFunction.cs @@ -23,8 +23,20 @@ namespace AWS.Lambda.Powertools.Idempotency.Tests.Handlers; /// public class IdempotencyInternalFunction { + private readonly bool _registerContext; + public bool IsSubMethodCalled { get; private set; } = false; + + public IdempotencyInternalFunction(bool registerContext) + { + _registerContext = registerContext; + } + public Basket HandleRequest(Product input, ILambdaContext context) { + if (_registerContext) { + Idempotency.RegisterLambdaContext(context); + } + return CreateBasket("fake", input); } @@ -36,6 +48,4 @@ private Basket CreateBasket([IdempotencyKey]string magicProduct, Product p) b.Add(new Product(0, magicProduct, 0)); return b; } - - public bool IsSubMethodCalled { get; private set; } = false; } \ No newline at end of file diff --git a/libraries/tests/AWS.Lambda.Powertools.Idempotency.Tests/Internal/IdempotentAspectTests.cs b/libraries/tests/AWS.Lambda.Powertools.Idempotency.Tests/Internal/IdempotentAspectTests.cs index beb7f7d9..c5eda8c2 100644 --- a/libraries/tests/AWS.Lambda.Powertools.Idempotency.Tests/Internal/IdempotentAspectTests.cs +++ b/libraries/tests/AWS.Lambda.Powertools.Idempotency.Tests/Internal/IdempotentAspectTests.cs @@ -27,6 +27,7 @@ using NSubstitute; using NSubstitute.ExceptionExtensions; using Xunit; +// ReSharper disable CompareOfFloatsByEqualityOperator [assembly: CollectionBehavior(DisableTestParallelization = true)] @@ -48,11 +49,16 @@ public async Task Handle_WhenFirstCall_ShouldPutInStore(Type type) .WithOptions(optionsBuilder => optionsBuilder.WithEventKeyJmesPath("Id")) ); + var context = new TestLambdaContext + { + RemainingTime = TimeSpan.FromSeconds(30) + }; + var function = Activator.CreateInstance(type) as IIdempotencyEnabledFunction; var product = new Product(42, "fake product", 12); //Act - var basket = await function!.HandleTest(product, new TestLambdaContext()); + var basket = await function!.HandleTest(product, context); //Assert basket.Products.Count.Should().Be(1); @@ -61,7 +67,7 @@ public async Task Handle_WhenFirstCall_ShouldPutInStore(Type type) await store.Received().SaveInProgress( Arg.Is(t => t.ToString() == JsonSerializer.SerializeToDocument(product, new JsonSerializerOptions()).ToString()), - Arg.Any() + Arg.Any(), Arg.Is(d => d == 30000) ); await store.Received().SaveSuccess( @@ -78,7 +84,7 @@ public async Task Handle_WhenSecondCall_AndNotExpired_ShouldGetFromStore(Type ty { //Arrange var store = Substitute.For(); - store.SaveInProgress(Arg.Any(), Arg.Any()) + store.SaveInProgress(Arg.Any(), Arg.Any(), Arg.Any()) .Returns(_ => throw new IdempotencyItemAlreadyExistsException()); // GIVEN @@ -122,7 +128,7 @@ public async Task Handle_WhenSecondCall_AndStatusInProgress_ShouldThrowIdempoten .WithOptions(optionsBuilder => optionsBuilder.WithEventKeyJmesPath("Id")) ); - store.SaveInProgress(Arg.Any(), Arg.Any()) + store.SaveInProgress(Arg.Any(), Arg.Any(), Arg.Any()) .Returns(_ => throw new IdempotencyItemAlreadyExistsException()); var product = new Product(42, "fake product", 12); @@ -143,6 +149,46 @@ public async Task Handle_WhenSecondCall_AndStatusInProgress_ShouldThrowIdempoten // Assert await act.Should().ThrowAsync(); } + + [Theory] + [InlineData(typeof(IdempotencyEnabledFunction))] + [InlineData(typeof(IdempotencyEnabledSyncFunction))] + public async Task Handle_WhenSecondCall_InProgress_LambdaTimeout_Expired_ShouldThrowIdempotencyInconsistentStateException(Type type) + { + // Arrange + var store = Substitute.For(); + + Idempotency.Configure(builder => + builder + .WithPersistenceStore(store) + .WithOptions(optionsBuilder => optionsBuilder.WithEventKeyJmesPath("Id")) + ); + + store.SaveInProgress(Arg.Any(), Arg.Any(), Arg.Any()) + .Returns(_ => throw new IdempotencyItemAlreadyExistsException()); + + var timestampInThePast = DateTimeOffset.Now.AddSeconds(-30).ToUnixTimeMilliseconds(); + + var product = new Product(42, "fake product", 12); + var basket = new Basket(product); + var record = new DataRecord( + "42", + DataRecord.DataRecordStatus.INPROGRESS, + DateTimeOffset.UtcNow.AddSeconds(356).ToUnixTimeSeconds(), + JsonSerializer.SerializeToNode(basket)!.ToString(), + null, + timestampInThePast); + + store.GetRecord(Arg.Any(), Arg.Any()) + .Returns(record); + + // Act + var function = Activator.CreateInstance(type) as IIdempotencyEnabledFunction; + Func act = async () => await function!.HandleTest(product, new TestLambdaContext()); + + // Assert + await act.Should().ThrowAsync(); + } [Theory] [InlineData(typeof(IdempotencyWithErrorFunction))] @@ -231,22 +277,27 @@ public async Task Handle_WhenIdempotencyOnSubMethodAnnotated_AndFirstCall_Should var store = Substitute.For(); Idempotency.Configure(builder => builder.WithPersistenceStore(store)); + var context = new TestLambdaContext + { + RemainingTime = TimeSpan.FromSeconds(30) + }; + // Act - IdempotencyInternalFunction function = new IdempotencyInternalFunction(); + IdempotencyInternalFunction function = new IdempotencyInternalFunction(true); Product product = new Product(42, "fake product", 12); - Basket resultBasket = function.HandleRequest(product, new TestLambdaContext()); - + Basket resultBasket = function.HandleRequest(product, context); + // Assert resultBasket.Products.Count.Should().Be(2); function.IsSubMethodCalled.Should().BeTrue(); - + await store .Received(1) .SaveInProgress( Arg.Is(t => t.ToString() == JsonSerializer.SerializeToDocument("fake", new JsonSerializerOptions()) .ToString()), - Arg.Any()); + Arg.Any(), Arg.Is(d => d == 30000)); await store .Received(1) @@ -259,7 +310,7 @@ public void Handle_WhenIdempotencyOnSubMethodAnnotated_AndSecondCall_AndNotExpir { // Arrange var store = Substitute.For(); - store.SaveInProgress(Arg.Any(), Arg.Any()) + store.SaveInProgress(Arg.Any(), Arg.Any(), Arg.Any()) .Throws(new IdempotencyItemAlreadyExistsException()); Idempotency.Configure(builder => builder.WithPersistenceStore(store)); @@ -276,7 +327,7 @@ public void Handle_WhenIdempotencyOnSubMethodAnnotated_AndSecondCall_AndNotExpir .Returns(record); // Act - var function = new IdempotencyInternalFunction(); + var function = new IdempotencyInternalFunction(false); Basket resultBasket = function.HandleRequest(product, new TestLambdaContext()); // assert diff --git a/libraries/tests/AWS.Lambda.Powertools.Idempotency.Tests/Persistence/BasePersistenceStoreTests.cs b/libraries/tests/AWS.Lambda.Powertools.Idempotency.Tests/Persistence/BasePersistenceStoreTests.cs index 43dbbc38..0b0ff4ad 100644 --- a/libraries/tests/AWS.Lambda.Powertools.Idempotency.Tests/Persistence/BasePersistenceStoreTests.cs +++ b/libraries/tests/AWS.Lambda.Powertools.Idempotency.Tests/Persistence/BasePersistenceStoreTests.cs @@ -81,7 +81,7 @@ public async Task SaveInProgress_WhenDefaultConfig_ShouldSaveRecordInStore() var now = DateTimeOffset.UtcNow; // Act - await persistenceStore.SaveInProgress(JsonSerializer.SerializeToDocument(request)!, now); + await persistenceStore.SaveInProgress(JsonSerializer.SerializeToDocument(request)!, now, null); // Assert var dr = persistenceStore.DataRecord; @@ -92,6 +92,32 @@ public async Task SaveInProgress_WhenDefaultConfig_ShouldSaveRecordInStore() dr.PayloadHash.Should().BeEmpty(); persistenceStore.Status.Should().Be(1); } + + [Fact] + public async Task SaveInProgress_WhenRemainingTime_ShouldSaveRecordInStore() + { + // Arrange + var persistenceStore = new InMemoryPersistenceStore(); + var request = LoadApiGatewayProxyRequest(); + + persistenceStore.Configure(new IdempotencyOptionsBuilder().Build(), null); + + var now = DateTimeOffset.UtcNow; + var lambdaTimeoutMs = 30000; + + // Act + await persistenceStore.SaveInProgress(JsonSerializer.SerializeToDocument(request)!, now, lambdaTimeoutMs); + + // Assert + var dr = persistenceStore.DataRecord; + dr.Status.Should().Be(DataRecord.DataRecordStatus.INPROGRESS); + dr.ExpiryTimestamp.Should().Be(now.AddSeconds(3600).ToUnixTimeSeconds()); + dr.ResponseData.Should().BeNull(); + dr.IdempotencyKey.Should().Be("testFunction#5eff007a9ed2789a9f9f6bc182fc6ae6"); + dr.PayloadHash.Should().BeEmpty(); + dr.InProgressExpiryTimestamp.Should().Be(now.AddMilliseconds(lambdaTimeoutMs).ToUnixTimeMilliseconds()); + persistenceStore.Status.Should().Be(1); + } [Fact] public async Task SaveInProgress_WhenKeyJmesPathIsSet_ShouldSaveRecordInStore_WithIdempotencyKeyEqualsKeyJmesPath() @@ -107,7 +133,7 @@ public async Task SaveInProgress_WhenKeyJmesPathIsSet_ShouldSaveRecordInStore_Wi var now = DateTimeOffset.UtcNow; // Act - await persistenceStore.SaveInProgress(JsonSerializer.SerializeToDocument(request)!, now); + await persistenceStore.SaveInProgress(JsonSerializer.SerializeToDocument(request)!, now, null); // Assert var dr = persistenceStore.DataRecord; @@ -133,7 +159,7 @@ public async Task SaveInProgress_WhenKeyJmesPathIsSetToMultipleFields_ShouldSave var now = DateTimeOffset.UtcNow; // Act - await persistenceStore.SaveInProgress(JsonSerializer.SerializeToDocument(request)!, now); + await persistenceStore.SaveInProgress(JsonSerializer.SerializeToDocument(request)!, now, null); // Assert var dr = persistenceStore.DataRecord; @@ -160,7 +186,7 @@ public async Task SaveInProgress_WhenJMESPath_NotFound_ShouldThrowException() var now = DateTimeOffset.UtcNow; // Act - var act = async () => await persistenceStore.SaveInProgress(JsonSerializer.SerializeToDocument(request)!, now); + var act = async () => await persistenceStore.SaveInProgress(JsonSerializer.SerializeToDocument(request)!, now, null); // Assert await act.Should() @@ -184,7 +210,7 @@ public async Task SaveInProgress_WhenJMESpath_NotFound_ShouldNotThrowException() var now = DateTimeOffset.UtcNow; // Act - await persistenceStore.SaveInProgress(JsonSerializer.SerializeToDocument(request)!, now); + await persistenceStore.SaveInProgress(JsonSerializer.SerializeToDocument(request)!, now, null); // Assert var dr = persistenceStore.DataRecord; @@ -215,7 +241,7 @@ public async Task SaveInProgress_WhenLocalCacheIsSet_AndNotExpired_ShouldThrowEx ); // Act - var act = () => persistenceStore.SaveInProgress(JsonSerializer.SerializeToDocument(request)!, now); + var act = () => persistenceStore.SaveInProgress(JsonSerializer.SerializeToDocument(request)!, now, null); // Assert await act.Should() @@ -248,7 +274,7 @@ public async Task SaveInProgress_WhenLocalCacheIsSetButExpired_ShouldRemoveFromC ); // Act - await persistenceStore.SaveInProgress(JsonSerializer.SerializeToDocument(request)!, now); + await persistenceStore.SaveInProgress(JsonSerializer.SerializeToDocument(request)!, now, null); // Assert var dr = persistenceStore.DataRecord; diff --git a/libraries/tests/AWS.Lambda.Powertools.Idempotency.Tests/Persistence/DynamoDBPersistenceStoreTests.cs b/libraries/tests/AWS.Lambda.Powertools.Idempotency.Tests/Persistence/DynamoDBPersistenceStoreTests.cs index a6f61531..957adc3f 100644 --- a/libraries/tests/AWS.Lambda.Powertools.Idempotency.Tests/Persistence/DynamoDBPersistenceStoreTests.cs +++ b/libraries/tests/AWS.Lambda.Powertools.Idempotency.Tests/Persistence/DynamoDBPersistenceStoreTests.cs @@ -81,7 +81,7 @@ public async Task PutRecord_WhenRecordAlreadyExist_ShouldThrowIdempotencyItemAlr // Insert a fake item with same id Dictionary item = new(key); var now = DateTimeOffset.UtcNow; - var expiry = now.AddSeconds(30).ToUnixTimeMilliseconds(); + var expiry = now.AddSeconds(30).ToUnixTimeSeconds(); item.Add("expiration", new AttributeValue {N = expiry.ToString()}); item.Add("status", new AttributeValue(DataRecord.DataRecordStatus.COMPLETED.ToString())); item.Add("data", new AttributeValue("Fake Data")); @@ -116,6 +116,101 @@ await _client.PutItemAsync(new PutItemRequest itemInDb["data"].S.Should().Be("Fake Data"); } + [Fact] + public async Task PutRecord_ShouldBlockUpdate_IfRecordAlreadyExistAndProgressNotExpiredAfterLambdaTimedOut() + { + // Arrange + var key = CreateKey("key"); + + // Insert a fake item with same id + Dictionary item = new(key); + var now = DateTimeOffset.UtcNow; + var expiry = now.AddSeconds(30).ToUnixTimeSeconds(); + var progressExpiry = now.AddSeconds(30).ToUnixTimeMilliseconds(); + + item.Add("expiration", new AttributeValue {N = expiry.ToString()}); + item.Add("status", new AttributeValue(DataRecord.DataRecordStatus.INPROGRESS.ToString())); + item.Add("data", new AttributeValue("Fake Data")); + item.Add("in_progress_expiration", new AttributeValue {N = progressExpiry.ToString()}); + + await _client.PutItemAsync(new PutItemRequest + { + TableName = _tableName, + Item = item + }); + + var expiry2 = now.AddSeconds(3600).ToUnixTimeSeconds(); + // Act + var act = () => _dynamoDbPersistenceStore.PutRecord( + new DataRecord("key", + DataRecord.DataRecordStatus.INPROGRESS, + expiry2, + "Fake Data 2", + null + ), now); + + // Assert + await act.Should().ThrowAsync(); + + // item was not updated, retrieve the initial one + var itemInDb = (await _client.GetItemAsync(new GetItemRequest + { + TableName = _tableName, + Key = key + })).Item; + itemInDb.Should().NotBeNull(); + itemInDb["status"].S.Should().Be("INPROGRESS"); + itemInDb["expiration"].N.Should().Be(expiry.ToString()); + itemInDb["data"].S.Should().Be("Fake Data"); + } + + [Fact] + public async Task PutRecord_ShouldCreateRecordInDynamoDB_IfLambdaWasInProgressAndTimedOut() + { + // Arrange + var key = CreateKey("key"); + + // Insert a fake item with same id + Dictionary item = new(key); + var now = DateTimeOffset.UtcNow; + var expiry = now.AddSeconds(30).ToUnixTimeSeconds(); + var progressExpiry = now.AddSeconds(-30).ToUnixTimeMilliseconds(); + + item.Add("expiration", new AttributeValue {N = expiry.ToString()}); + item.Add("status", new AttributeValue(DataRecord.DataRecordStatus.INPROGRESS.ToString())); + item.Add("data", new AttributeValue("Fake Data")); + item.Add("in_progress_expiration", new AttributeValue {N = progressExpiry.ToString()}); + + await _client.PutItemAsync(new PutItemRequest + { + TableName = _tableName, + Item = item + }); + + var expiry2 = now.AddSeconds(3600).ToUnixTimeSeconds(); + + // Act + await _dynamoDbPersistenceStore.PutRecord( + new DataRecord("key", + DataRecord.DataRecordStatus.INPROGRESS, + expiry2, + null, + null + ), now); + + // Assert + // an item is inserted + var itemInDb = (await _client.GetItemAsync(new GetItemRequest + { + TableName = _tableName, + Key = key + })).Item; + + itemInDb.Should().NotBeNull(); + itemInDb["status"].S.Should().Be("INPROGRESS"); + itemInDb["expiration"].N.Should().Be(expiry2.ToString()); + } + //getRecord [Fact] public async Task GetRecord_WhenRecordExistsInDynamoDb_ShouldReturnExistingRecord() @@ -129,7 +224,7 @@ public async Task GetRecord_WhenRecordExistsInDynamoDb_ShouldReturnExistingRecor {"id", new AttributeValue("key")} //key }; var now = DateTimeOffset.UtcNow; - var expiry = now.AddSeconds(30).ToUnixTimeMilliseconds(); + var expiry = now.AddSeconds(30).ToUnixTimeSeconds(); item.Add("expiration", new AttributeValue { N = expiry.ToString() @@ -173,7 +268,7 @@ public async Task UpdateRecord_WhenRecordExistsInDynamoDb_ShouldUpdateRecord() var key = CreateKey("key"); Dictionary item = new(key); var now = DateTimeOffset.UtcNow; - var expiry = now.AddSeconds(360).ToUnixTimeMilliseconds(); + var expiry = now.AddSeconds(360).ToUnixTimeSeconds(); item.Add("expiration", new AttributeValue { N = expiry.ToString() @@ -189,7 +284,7 @@ await _client.PutItemAsync(new PutItemRequest null); // Act - expiry = now.AddSeconds(3600).ToUnixTimeMilliseconds(); + expiry = now.AddSeconds(3600).ToUnixTimeSeconds(); var record = new DataRecord("key", DataRecord.DataRecordStatus.COMPLETED, expiry, "Fake result", "hash"); await _dynamoDbPersistenceStore.UpdateRecord(record); @@ -214,7 +309,7 @@ public async Task DeleteRecord_WhenRecordExistsInDynamoDb_ShouldDeleteRecord() var key = CreateKey("key"); Dictionary item = new(key); var now = DateTimeOffset.UtcNow; - var expiry = now.AddSeconds(360).ToUnixTimeMilliseconds(); + var expiry = now.AddSeconds(360).ToUnixTimeSeconds(); item.Add("expiration", new AttributeValue {N=expiry.ToString()}); item.Add("status", new AttributeValue(DataRecord.DataRecordStatus.INPROGRESS.ToString())); await _client.PutItemAsync(new PutItemRequest @@ -278,7 +373,7 @@ public async Task EndToEndWithCustomAttrNamesAndSortKey() var record = new DataRecord( "mykey", DataRecord.DataRecordStatus.INPROGRESS, - now.AddSeconds(400).ToUnixTimeMilliseconds(), + now.AddSeconds(400).ToUnixTimeSeconds(), null, null ); @@ -310,7 +405,7 @@ public async Task EndToEndWithCustomAttrNamesAndSortKey() var updatedRecord = new DataRecord( "mykey", DataRecord.DataRecordStatus.COMPLETED, - now.AddSeconds(500).ToUnixTimeMilliseconds(), + now.AddSeconds(500).ToUnixTimeSeconds(), "response", null );