diff --git a/Test/DurableTask.AzureStorage.Tests/TestPartitionIndex.cs b/Test/DurableTask.AzureStorage.Tests/TestPartitionIndex.cs new file mode 100644 index 000000000..ef96c213e --- /dev/null +++ b/Test/DurableTask.AzureStorage.Tests/TestPartitionIndex.cs @@ -0,0 +1,131 @@ +// ---------------------------------------------------------------------------------- +// Copyright Microsoft Corporation +// Licensed under the Apache License, Version 2.0 (the "License"); +// you may not use this file except in compliance with the License. +// You may obtain a copy of the License at +// http://www.apache.org/licenses/LICENSE-2.0 +// Unless required by applicable law or agreed to in writing, software +// distributed under the License is distributed on an "AS IS" BASIS, +// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +// See the License for the specific language governing permissions and +// limitations under the License. +// ---------------------------------------------------------------------------------- + +namespace DurableTask.AzureStorage.Tests +{ + using System.Collections.Generic; + using System.Threading; + using Microsoft.VisualStudio.TestTools.UnitTesting; + + [TestClass] + public class TestPartitionIndex + { + private AzureStorageOrchestrationService azureStorageOrchestrationService; + private AzureStorageOrchestrationServiceSettings settings; + private int partitionCount = 4; + private CancellationTokenSource cancellationTokenSource; + private const string TaskHub = "taskHubName"; + + private Dictionary partitionToInstanceId = new Dictionary() + { + { 0, "sampleinstanceid!13"}, + { 1, "sampleinstanceid!3"}, + { 2, "sampleinstanceid!1!"}, + { 3, "sampleinstanceid!1"} + }; + + private Dictionary partitionToInstanceIdWithExplicitPartitionPlacement = new Dictionary() + { + { 0, "sampleinstanceid!0"}, + { 1, "sampleinstanceid!2!1"}, + { 2, "sampleinstanceid!2"}, + { 3, "sampleinstanceid!3"} + }; + + private Dictionary partitionToInstanceIdWithExplicitPartitionPlacementEndingWithExclamation = new Dictionary() + { + { 0, "sampleinstanceid!3!"}, + { 1, "sampleinstanceid!2!"}, + { 2, "sampleinstanceid!1!"}, + { 3, "sampleinstanceid!0!"} + }; + + [TestInitialize] + public void Initialize() + { + cancellationTokenSource = new CancellationTokenSource(); + + settings = new AzureStorageOrchestrationServiceSettings() + { + StorageConnectionString = TestHelpers.GetTestStorageAccountConnectionString(), + TaskHubName = TaskHub, + PartitionCount = partitionCount + }; + + azureStorageOrchestrationService = new AzureStorageOrchestrationService(settings); + + string str = "sampleinstanceid!0!"; + var index = azureStorageOrchestrationService.GetPartitionIndex(str); + } + + [TestMethod] + public void GetPartitionIndexTest_EnableExplicitPartitionPlacement_False() + { + settings.EnableExplicitPartitionPlacement = false; + + foreach (var kvp in partitionToInstanceId) + { + var instanceId = kvp.Value; + var expectedPartitionIndex = kvp.Key; + var partitionIndex = azureStorageOrchestrationService.GetPartitionIndex(instanceId); + + Assert.AreEqual(expectedPartitionIndex, partitionIndex); + } + } + + [TestMethod] + public void GetPartitionIndexTest_EnableExplicitPartitionPlacement_True() + { + settings.EnableExplicitPartitionPlacement = true; + + foreach (var kvp in partitionToInstanceIdWithExplicitPartitionPlacement) + { + var instanceId = kvp.Value; + var expectedPartitionIndex = kvp.Key; + var partitionIndex = azureStorageOrchestrationService.GetPartitionIndex(instanceId); + + Assert.AreEqual(expectedPartitionIndex, partitionIndex); + } + } + + [TestMethod] + public void GetPartitionIndexTest_EndingWithExclamation_EnableExplicitPartitionPlacement_True() + { + settings.EnableExplicitPartitionPlacement = true; + + foreach (var kvp in partitionToInstanceIdWithExplicitPartitionPlacementEndingWithExclamation) + { + var instanceId = kvp.Value; + var expectedPartitionIndex = kvp.Key; + var partitionIndex = azureStorageOrchestrationService.GetPartitionIndex(instanceId); + + Assert.AreEqual(expectedPartitionIndex, partitionIndex); + } + } + + [TestMethod] + public void GetPartitionIndexTest_EndingWithExclamation_EnableExplicitPartitionPlacement_False() + { + settings.EnableExplicitPartitionPlacement = false; + + foreach (var kvp in partitionToInstanceIdWithExplicitPartitionPlacementEndingWithExclamation) + { + var instanceId = kvp.Value; + var expectedPartitionIndex = kvp.Key; + var partitionIndex = azureStorageOrchestrationService.GetPartitionIndex(instanceId); + + Assert.AreEqual(expectedPartitionIndex, partitionIndex); + } + } + } +} diff --git a/src/DurableTask.AzureStorage/AzureStorageOrchestrationService.cs b/src/DurableTask.AzureStorage/AzureStorageOrchestrationService.cs index 9f5d15047..63daf0ec1 100644 --- a/src/DurableTask.AzureStorage/AzureStorageOrchestrationService.cs +++ b/src/DurableTask.AzureStorage/AzureStorageOrchestrationService.cs @@ -2048,9 +2048,9 @@ public Task DownloadBlobAsync(string blobUri) // TODO: Change this to a sticky assignment so that partition count changes can // be supported: https://github.com/Azure/azure-functions-durable-extension/issues/1 - async Task GetControlQueueAsync(string instanceId) + internal async Task GetControlQueueAsync(string instanceId) { - uint partitionIndex = Fnv1aHashHelper.ComputeHash(instanceId) % (uint)this.settings.PartitionCount; + uint partitionIndex = GetPartitionIndex(instanceId); string queueName = GetControlQueueName(this.settings.TaskHubName, (int)partitionIndex); ControlQueue cachedQueue; @@ -2075,6 +2075,30 @@ public Task DownloadBlobAsync(string blobUri) return cachedQueue; } + internal uint GetPartitionIndex(string instanceId) + { + uint totalPartitions = (uint)this.settings.PartitionCount; + + int placementSeparatorPosition = instanceId.LastIndexOf('!'); + + // if the instance id ends with !nnn, where nnn is an unsigned number, it indicates explicit partition placement + // if the instance id has multiple ! then only the last one will be considered. i.e., abd!2!3 then !3 will be considered. + // and if the instance id ends with !, then it will considered in non '!nnn' format and hashing will be used for control-queue placement. + if ( + this.settings.EnableExplicitPartitionPlacement + && placementSeparatorPosition != -1 + && uint.TryParse(instanceId.Substring(placementSeparatorPosition + 1), out uint index)) + { + var partitionId = index % totalPartitions; + return (uint)partitionId; + } + else + { + return Fnv1aHashHelper.ComputeHash(instanceId) % totalPartitions; + + } + } + /// /// Disposes of the current object. /// diff --git a/src/DurableTask.AzureStorage/AzureStorageOrchestrationServiceSettings.cs b/src/DurableTask.AzureStorage/AzureStorageOrchestrationServiceSettings.cs index 609a8b35d..6518fb1b9 100644 --- a/src/DurableTask.AzureStorage/AzureStorageOrchestrationServiceSettings.cs +++ b/src/DurableTask.AzureStorage/AzureStorageOrchestrationServiceSettings.cs @@ -318,5 +318,14 @@ internal LogHelper Logger /// Consumers that require separate dispatch (such as the new out-of-proc v2 SDKs) must set this to true. /// public bool UseSeparateQueueForEntityWorkItems { get; set; } = false; + + /// + /// Whether to allow instanceIDs to use special syntax to land on a specific partition. + /// If enabled, when an instanceID ends with suffix '!nnn', where 'nnn' is an unsigned number, the instance will land on the partition/queue for to that number. + /// + /// + /// It is not generally safe to change to this flag for pre-existing TaskHubs, as it may change the expected target queue for an instanceID. + /// + public bool EnableExplicitPartitionPlacement { get; set; } = false; } }