Skip to content

Commit c5dcafa

Browse files
committed
initial commit
1 parent 463c052 commit c5dcafa

File tree

1 file changed

+26
-18
lines changed

1 file changed

+26
-18
lines changed

src/DurableTask.AzureStorage/Partitioning/TablePartitionManager.cs

Lines changed: 26 additions & 18 deletions
Original file line numberDiff line numberDiff line change
@@ -412,6 +412,24 @@ public async Task<ReadTableReponse> ReadAndWriteTableAsync(bool isShuttingDown,
412412
try
413413
{
414414
await this.partitionTable.ReplaceEntityAsync(partition, etag, forcefulShutdownToken);
415+
416+
// Ensure worker is listening to the control queue iff either:
417+
// 1) worker just claimed the lease,
418+
// 2) worker was already the owner in the partitions table and is not actively draining the queue.
419+
// Note that during draining, we renew the lease but do not want to listen to new messages.
420+
// Otherwise, we'll never finish draining our in-memory messages.
421+
// When drain completes, and the worker may decide to release the lease. In that moment,
422+
// IsDrainingPartition can still be true but renewedLease is false — without checking
423+
// !releasedLease, the worker could incorrectly resume listening just before releasing the lease.
424+
bool isRenewingToDrainQueue = renewedLease && response.IsDrainingPartition && !releasedLease;
425+
if (claimedLease || !isRenewingToDrainQueue)
426+
{
427+
// Notify the orchestration session manager that we acquired a lease for one of the partitions.
428+
// This will cause it to start reading control queue messages for that partition.
429+
await this.service.OnTableLeaseAcquiredAsync(partition);
430+
}
431+
432+
this.LogHelper(partition, claimedLease, stoleLease, renewedLease, drainedLease, releasedLease, previousOwner);
415433
}
416434
catch (DurableTaskStorageException ex) when (ex.HttpStatusCode == (int)HttpStatusCode.PreconditionFailed)
417435
{
@@ -423,19 +441,6 @@ public async Task<ReadTableReponse> ReadAndWriteTableAsync(bool isShuttingDown,
423441
$"Failed to update table entry due to an Etag mismatch. Failed ETag value: '{etag}'.");
424442
throw;
425443
}
426-
427-
// Ensure worker is listening to the control queue iff either:
428-
// 1) worker just claimed the lease,
429-
// 2) worker was already the owner in the partitions table and is not actively draining the queue. Note that during draining, we renew the lease but do not want to listen to new messages. Otherwise, we'll never finish draining our in-memory messages.
430-
bool isRenewingToDrainQueue = renewedLease & response.IsDrainingPartition;
431-
if (claimedLease || !isRenewingToDrainQueue)
432-
{
433-
// Notify the orchestration session manager that we acquired a lease for one of the partitions.
434-
// This will cause it to start reading control queue messages for that partition.
435-
await this.service.OnTableLeaseAcquiredAsync(partition);
436-
}
437-
438-
this.LogHelper(partition, claimedLease, stoleLease, renewedLease, drainedLease, releasedLease, previousOwner);
439444
}
440445
}
441446

@@ -505,7 +510,8 @@ void RenewOrReleaseMyLease(
505510
partition,
506511
ref releasedLease,
507512
ref renewedLease,
508-
ref drainedLease);
513+
ref drainedLease,
514+
CloseReason.LeaseLost);
509515
}
510516
}
511517

@@ -583,7 +589,8 @@ void TryDrainAndReleaseAllPartitions(
583589
partition,
584590
ref releasedLease,
585591
ref renewedLease,
586-
ref drainedLease);
592+
ref drainedLease,
593+
CloseReason.Shutdown);
587594

588595
if (releasedLease)
589596
{
@@ -661,7 +668,7 @@ await this.partitionTable.ReplaceEntityAsync(
661668
partition,
662669
etag,
663670
forceShutdownToken);
664-
671+
665672
this.settings.Logger.LeaseStealingSucceeded(
666673
this.storageAccountName,
667674
this.settings.TaskHubName,
@@ -815,7 +822,8 @@ void CheckDrainTask(
815822
TablePartitionLease partition,
816823
ref bool releasedLease,
817824
ref bool renewedLease,
818-
ref bool drainedLease)
825+
ref bool drainedLease,
826+
CloseReason reason)
819827
{
820828
// Check if drain process has started.
821829
if (this.backgroundDrainTasks.TryGetValue(partition.RowKey!, out Task? drainTask))
@@ -844,7 +852,7 @@ void CheckDrainTask(
844852
}
845853
else// If drain task hasn't been started yet, start it and keep renewing the lease to prevent it from expiring.
846854
{
847-
this.DrainPartition(partition, CloseReason.Shutdown);
855+
this.DrainPartition(partition, reason);
848856
this.RenewLease(partition);
849857
renewedLease = true;
850858
drainedLease = true;

0 commit comments

Comments
 (0)