Skip to content

[Core] Wiring up rehydration token logic in data-plane #49224

New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Merged
merged 11 commits into from
May 12, 2025
Original file line number Diff line number Diff line change
Expand Up @@ -58,17 +58,21 @@ public static IOperation Create(
apiVersionStr = !skipApiVersionOverride && TryGetApiVersion(startRequestUri, out ReadOnlySpan<char> apiVersion) ? apiVersion.ToString() : null;
}
var headerSource = GetHeaderSource(requestMethod, startRequestUri, response, apiVersionStr, out string nextRequestUri, out bool isNextRequestPolling);
if (headerSource == HeaderSource.None && IsFinalState(response, headerSource, out var failureState, out _))
{
return new CompletedOperation(failureState ?? GetOperationStateFromFinalResponse(requestMethod, response));
}

string? lastKnownLocation;
if (!response.Headers.TryGetValue("Location", out lastKnownLocation))
{
lastKnownLocation = null;
}
return new NextLinkOperationImplementation(pipeline, requestMethod, startRequestUri, nextRequestUri, headerSource, lastKnownLocation, finalStateVia, apiVersionStr, isNextRequestPolling : isNextRequestPolling);

NextLinkOperationImplementation operation = new(pipeline, requestMethod, startRequestUri, nextRequestUri, headerSource, lastKnownLocation, finalStateVia, apiVersionStr, isNextRequestPolling: isNextRequestPolling);

if (headerSource == HeaderSource.None && IsFinalState(response, headerSource, out var failureState, out _))
{
return new CompletedOperation(failureState ?? GetOperationStateFromFinalResponse(requestMethod, response), operation);
}

return operation;
}

public static IOperation<T> Create<T>(
Expand Down Expand Up @@ -647,12 +651,17 @@ private class CompletedOperation : IOperation
{
private readonly OperationState _operationState;

public CompletedOperation(OperationState operationState)
private readonly NextLinkOperationImplementation _operation;

public CompletedOperation(OperationState operationState, NextLinkOperationImplementation operation)
{
_operationState = operationState;
_operation = operation;
}

public ValueTask<OperationState> UpdateStateAsync(bool async, CancellationToken cancellationToken) => new(_operationState);

public RehydrationToken GetRehydrationToken() => _operation.GetRehydrationToken();
}

private sealed class OperationToOperationOfT<T> : IOperation<T>
Expand Down Expand Up @@ -685,6 +694,8 @@ public async ValueTask<OperationState<T>> UpdateStateAsync(bool async, Cancellat

return OperationState<T>.Pending(state.RawResponse);
}

public RehydrationToken GetRehydrationToken() => _operation.GetRehydrationToken();
}
}
}
7 changes: 7 additions & 0 deletions sdk/core/Azure.Core/src/Shared/OperationInternal.cs
Original file line number Diff line number Diff line change
Expand Up @@ -119,6 +119,8 @@ public OperationToOperationOfTProxy(IOperation operation)
_operation = operation;
}

public RehydrationToken GetRehydrationToken() => _operation.GetRehydrationToken();

public async ValueTask<OperationState<VoidValue>> UpdateStateAsync(bool async, CancellationToken cancellationToken)
{
var state = await _operation.UpdateStateAsync(async, cancellationToken).ConfigureAwait(false);
Expand Down Expand Up @@ -172,6 +174,11 @@ internal interface IOperation
/// </list>
/// </returns>
ValueTask<OperationState> UpdateStateAsync(bool async, CancellationToken cancellationToken);

/// <summary>
/// Get a token that can be used to rehydrate the operation.
/// </summary>
RehydrationToken GetRehydrationToken();
}

/// <summary>
Expand Down
9 changes: 9 additions & 0 deletions sdk/core/Azure.Core/src/Shared/OperationInternalOfT.cs
Original file line number Diff line number Diff line change
Expand Up @@ -288,6 +288,10 @@ private class FinalOperation : IOperation<T>
{
public ValueTask<OperationState<T>> UpdateStateAsync(bool async, CancellationToken cancellationToken)
=> throw new NotSupportedException("The operation has already completed");

// Unreachable path. _operation.GetRehydrationToken() is never invoked.
public RehydrationToken GetRehydrationToken()
=> throw new NotSupportedException($"Getting the rehydration token of a {nameof(FinalOperation)} is not supported");
}
}

Expand Down Expand Up @@ -328,6 +332,11 @@ internal interface IOperation<T>
/// </list>
/// </returns>
ValueTask<OperationState<T>> UpdateStateAsync(bool async, CancellationToken cancellationToken);

/// <summary>
/// Get a token that can be used to rehydrate the operation.
/// </summary>
RehydrationToken GetRehydrationToken();
}

/// <summary>
Expand Down
5 changes: 5 additions & 0 deletions sdk/core/Azure.Core/src/Shared/ProtocolOperation.cs
Original file line number Diff line number Diff line change
Expand Up @@ -30,6 +30,11 @@ internal ProtocolOperation(ClientDiagnostics clientDiagnostics, HttpPipeline pip
public override string Id => throw new NotSupportedException();
#pragma warning restore CA1822

/// <inheritdoc />
public override RehydrationToken? GetRehydrationToken() => ((IOperation<T>)this).GetRehydrationToken();

RehydrationToken IOperation<T>.GetRehydrationToken() => _nextLinkOperation.GetRehydrationToken();

/// <inheritdoc />
public override T Value => _operation.Value;

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -76,6 +76,7 @@ public ConvertOperation(Operation<TFrom> operation, ClientDiagnostics diagnostic
public override TTo Value => GetOrCreateValue();
public override bool HasValue => _operation.HasValue;
public override bool HasCompleted => _operation.HasCompleted;
public override RehydrationToken? GetRehydrationToken() => _operation.GetRehydrationToken();
public override Response GetRawResponse() => _operation.GetRawResponse();

public override Response UpdateStatus(CancellationToken cancellationToken = default)
Expand Down
2 changes: 2 additions & 0 deletions sdk/core/Azure.Core/tests/OperationInternalOfTTests.cs
Original file line number Diff line number Diff line change
Expand Up @@ -518,6 +518,8 @@ public TestOperation(Func<bool, CancellationToken, ValueTask<OperationState<int>
}

public ValueTask<OperationState<int>> UpdateStateAsync(bool async, CancellationToken cancellationToken) => _updateStateAsyncHandler(async, cancellationToken);

public RehydrationToken GetRehydrationToken() => default;
}

private class CallCountStrategy : DelayStrategy
Expand Down
2 changes: 2 additions & 0 deletions sdk/core/Azure.Core/tests/OperationInternalTests.cs
Original file line number Diff line number Diff line change
Expand Up @@ -438,6 +438,8 @@ public TestOperation(Func<bool, CancellationToken, ValueTask<OperationState>> up
}

public ValueTask<OperationState> UpdateStateAsync(bool async, CancellationToken cancellationToken) => _updateStateAsyncHandler(async, cancellationToken);

public RehydrationToken GetRehydrationToken() => default;
}

private class CallCountStrategy : DelayStrategy
Expand Down
2 changes: 2 additions & 0 deletions sdk/core/Azure.Core/tests/OperationPollerTests.cs
Original file line number Diff line number Diff line change
Expand Up @@ -57,6 +57,8 @@ public void WaitForCompletionResponseCancelled()
private class EndlessOperation : IOperation
{
public ValueTask<OperationState> UpdateStateAsync(bool async, CancellationToken cancellationToken) => new(OperationState.Pending(new MockResponse(200)));

public RehydrationToken GetRehydrationToken() => default;
}

private class TestDelayStrategy : DelayStrategy
Expand Down
197 changes: 197 additions & 0 deletions sdk/core/Azure.Core/tests/ProtocolOperationTests.cs
Original file line number Diff line number Diff line change
@@ -0,0 +1,197 @@
// Copyright (c) Microsoft Corporation. All rights reserved.
// Licensed under the MIT License.

using System;
using System.IO;
using System.Text;
using System.Threading.Tasks;
using Azure.Core.Pipeline;
using Azure.Core.TestFramework;
using NUnit.Framework;

namespace Azure.Core.Tests
{
public class ProtocolOperationTests
{
[Test]
public void GetRehydrationToken()
{
const string OperationId = "oper-ati-oni-d00";
const string RequestUri = "https://www.example.com/request";
const string LocationUri = $"https://www.example.com/{OperationId}";

using var request = new MockRequest() { Method = RequestMethod.Post };
using var response = new MockResponse(200);
var transport = new MockTransport();
var pipeline = new HttpPipeline(transport);

request.Uri.Reset(new Uri(RequestUri));
response.AddHeader("Location", LocationUri);

var operation = new ProtocolOperation<MockJsonModel>(null, pipeline, request, response, OperationFinalStateVia.Location, null, null);
var token = operation.GetRehydrationToken();

Assert.True(token.HasValue);
Assert.AreEqual(OperationId, token.Value.Id);
Assert.AreEqual(NextLinkOperationImplementation.RehydrationTokenVersion, token.Value.Version);
Assert.AreEqual("Location", token.Value.HeaderSource);
Assert.AreEqual(LocationUri, token.Value.NextRequestUri);
Assert.AreEqual(RequestUri, token.Value.InitialUri);
Assert.AreEqual(RequestMethod.Post, token.Value.RequestMethod);
Assert.AreEqual(LocationUri, token.Value.LastKnownLocation);
Assert.AreEqual("Location", token.Value.FinalStateVia);
}

[Test]
public void GetRehydrationTokenWithCompletedOperation()
{
const string RequestUri = "https://www.example.com/request";

using var request = new MockRequest() { Method = RequestMethod.Post };
using var response = new MockResponse(200);
var transport = new MockTransport();
var pipeline = new HttpPipeline(transport);

request.Uri.Reset(new Uri(RequestUri));

var operation = new ProtocolOperation<MockJsonModel>(null, pipeline, request, response, OperationFinalStateVia.Location, null, null);
var token = operation.GetRehydrationToken();

Assert.True(token.HasValue);
Assert.AreEqual("NOT_SET", token.Value.Id);
Assert.AreEqual(NextLinkOperationImplementation.RehydrationTokenVersion, token.Value.Version);
Assert.AreEqual("None", token.Value.HeaderSource);
Assert.AreEqual(RequestUri, token.Value.NextRequestUri);
Assert.AreEqual(RequestUri, token.Value.InitialUri);
Assert.AreEqual(RequestMethod.Post, token.Value.RequestMethod);
Assert.Null(token.Value.LastKnownLocation);
Assert.AreEqual("Location", token.Value.FinalStateVia);
}

[Test]
[TestCase(true)]
[TestCase(false)]
public async Task RehydrationOperationCanPoll(bool isAsync)
{
const string RequestUri = "https://www.example.com/request";
const string LocationUri = "https://www.example.com/oper-ati-oni-d00";

using var request = new MockRequest() { Method = RequestMethod.Post };
using var response = new MockResponse(200);
var transport = new MockTransport(response);
var pipeline = new HttpPipeline(transport);

request.Uri.Reset(new Uri(RequestUri));
response.AddHeader("Location", LocationUri);

var operation = new ProtocolOperation<MockJsonModel>(null, pipeline, request, response, OperationFinalStateVia.Location, null, null);
var token = operation.GetRehydrationToken();
_ = isAsync
? await Operation.RehydrateAsync(pipeline, token.Value)
: Operation.Rehydrate(pipeline, token.Value);
var rehydrationUpdateRequest = transport.SingleRequest;

Assert.AreEqual(LocationUri, rehydrationUpdateRequest.Uri.ToString());
Assert.AreEqual(RequestMethod.Get, rehydrationUpdateRequest.Method);
}

[Test]
[TestCase(true)]
[TestCase(false)]
public async Task RehydrationOperationWithCompletedOperationCanPoll(bool isAsync)
{
const string RequestUri = "https://www.example.com/request";

using var request = new MockRequest() { Method = RequestMethod.Post };
using var response = new MockResponse(200);
var transport = new MockTransport(response);
var pipeline = new HttpPipeline(transport);

request.Uri.Reset(new Uri(RequestUri));

var operation = new ProtocolOperation<MockJsonModel>(null, pipeline, request, response, OperationFinalStateVia.Location, null, null);
var token = operation.GetRehydrationToken();
_ = isAsync
? await Operation.RehydrateAsync(pipeline, token.Value)
: Operation.Rehydrate(pipeline, token.Value);
var rehydrationUpdateRequest = transport.SingleRequest;

Assert.AreEqual(RequestUri, rehydrationUpdateRequest.Uri.ToString());
Assert.AreEqual(RequestMethod.Get, rehydrationUpdateRequest.Method);
}

[Test]
[TestCase(true)]
[TestCase(false)]
public async Task RehydrationOperationOfTCanPoll(bool isAsync)
{
const string RequestUri = "https://www.example.com/request";
const string LocationUri = "https://www.example.com/oper-ati-oni-d00";

using var request = new MockRequest() { Method = RequestMethod.Post };
using var response = new MockResponse(200);
var transport = new MockTransport(response);
var pipeline = new HttpPipeline(transport);
using var responseContentStream = new MemoryStream(Encoding.UTF8.GetBytes("""
{
"IntValue": 1,
"StringValue": "one"
}
"""));

request.Uri.Reset(new Uri(RequestUri));
response.AddHeader("Location", LocationUri);
response.ContentStream = responseContentStream;

var operation = new ProtocolOperation<MockJsonModel>(null, pipeline, request, response, OperationFinalStateVia.Location, null, null);
var token = operation.GetRehydrationToken();
var rehydrationOperation = isAsync
? await Operation.RehydrateAsync<MockJsonModel>(pipeline, token.Value)
: Operation.Rehydrate<MockJsonModel>(pipeline, token.Value);
var rehydrationUpdateRequest = transport.SingleRequest;

Assert.AreEqual(LocationUri, rehydrationUpdateRequest.Uri.ToString());
Assert.AreEqual(RequestMethod.Get, rehydrationUpdateRequest.Method);

Assert.True(rehydrationOperation.HasValue);
Assert.AreEqual(rehydrationOperation.Value.IntValue, 1);
Assert.AreEqual(rehydrationOperation.Value.StringValue, "one");
}

[Test]
[TestCase(true)]
[TestCase(false)]
public async Task RehydrationOperationOfTWithCompletedOperationCanPoll(bool isAsync)
{
const string RequestUri = "https://www.example.com/request";

using var request = new MockRequest() { Method = RequestMethod.Post };
using var response = new MockResponse(200);
var transport = new MockTransport(response);
var pipeline = new HttpPipeline(transport);
using var responseContentStream = new MemoryStream(Encoding.UTF8.GetBytes("""
{
"IntValue": 1,
"StringValue": "one"
}
"""));

request.Uri.Reset(new Uri(RequestUri));
response.ContentStream = responseContentStream;

var operation = new ProtocolOperation<MockJsonModel>(null, pipeline, request, response, OperationFinalStateVia.Location, null, null);
var token = operation.GetRehydrationToken();
var rehydrationOperation = isAsync
? await Operation.RehydrateAsync<MockJsonModel>(pipeline, token.Value)
: Operation.Rehydrate<MockJsonModel>(pipeline, token.Value);
var rehydrationUpdateRequest = transport.SingleRequest;

Assert.AreEqual(RequestUri, rehydrationUpdateRequest.Uri.ToString());
Assert.AreEqual(RequestMethod.Get, rehydrationUpdateRequest.Method);

Assert.True(rehydrationOperation.HasValue);
Assert.AreEqual(rehydrationOperation.Value.IntValue, 1);
Assert.AreEqual(rehydrationOperation.Value.StringValue, "one");
}
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -204,5 +204,8 @@ async ValueTask<OperationState<AnalyzeResult>> IOperation<AnalyzeResult>.UpdateS

return OperationState<AnalyzeResult>.Pending(rawResponse);
}

// This method is never invoked since we don't override Operation<T>.GetRehydrationToken.
RehydrationToken IOperation<AnalyzeResult>.GetRehydrationToken() => default;
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -193,5 +193,8 @@ async ValueTask<OperationState<DocumentClassifierDetails>> IOperation<DocumentCl

return OperationState<DocumentClassifierDetails>.Pending(rawResponse);
}

// This method is never invoked since we don't override Operation<T>.GetRehydrationToken.
RehydrationToken IOperation<DocumentClassifierDetails>.GetRehydrationToken() => default;
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -193,5 +193,8 @@ async ValueTask<OperationState<DocumentModelDetails>> IOperation<DocumentModelDe

return OperationState<DocumentModelDetails>.Pending(rawResponse);
}

// This method is never invoked since we don't override Operation<T>.GetRehydrationToken.
RehydrationToken IOperation<DocumentModelDetails>.GetRehydrationToken() => default;
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -192,5 +192,8 @@ async ValueTask<OperationState<AnalyzeResult>> IOperation<AnalyzeResult>.UpdateS

return OperationState<AnalyzeResult>.Pending(rawResponse);
}

// This method is never invoked since we don't override Operation<T>.GetRehydrationToken.
RehydrationToken IOperation<AnalyzeResult>.GetRehydrationToken() => default;
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -193,5 +193,8 @@ async ValueTask<OperationState<DocumentModelDetails>> IOperation<DocumentModelDe

return OperationState<DocumentModelDetails>.Pending(rawResponse);
}

// This method is never invoked since we don't override Operation<T>.GetRehydrationToken.
RehydrationToken IOperation<DocumentModelDetails>.GetRehydrationToken() => default;
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -198,5 +198,8 @@ async ValueTask<OperationState<DocumentModelDetails>> IOperation<DocumentModelDe

return OperationState<DocumentModelDetails>.Pending(rawResponse);
}

// This method is never invoked since we don't override Operation<T>.GetRehydrationToken.
RehydrationToken IOperation<DocumentModelDetails>.GetRehydrationToken() => default;
}
}
Loading
Loading