@@ -412,24 +412,6 @@ public async Task<ReadTableReponse> ReadAndWriteTableAsync(bool isShuttingDown,
412
412
try
413
413
{
414
414
await this . partitionTable . ReplaceEntityAsync ( partition , etag , forcefulShutdownToken ) ;
415
-
416
- // Ensure worker is listening to the control queue iff either:
417
- // 1) worker just claimed the lease,
418
- // 2) worker was already the owner in the partitions table and is not actively draining the queue.
419
- // Note that during draining, we renew the lease but do not want to listen to new messages.
420
- // Otherwise, we'll never finish draining our in-memory messages.
421
- // When drain completes, and the worker may decide to release the lease. In that moment,
422
- // IsDrainingPartition can still be true but renewedLease is false — without checking
423
- // !releasedLease, the worker could incorrectly resume listening just before releasing the lease.
424
- bool isRenewingToDrainQueue = renewedLease && response . IsDrainingPartition && ! releasedLease ;
425
- if ( claimedLease || ! isRenewingToDrainQueue )
426
- {
427
- // Notify the orchestration session manager that we acquired a lease for one of the partitions.
428
- // This will cause it to start reading control queue messages for that partition.
429
- await this . service . OnTableLeaseAcquiredAsync ( partition ) ;
430
- }
431
-
432
- this . LogHelper ( partition , claimedLease , stoleLease , renewedLease , drainedLease , releasedLease , previousOwner ) ;
433
415
}
434
416
catch ( DurableTaskStorageException ex ) when ( ex . HttpStatusCode == ( int ) HttpStatusCode . PreconditionFailed )
435
417
{
@@ -441,6 +423,24 @@ public async Task<ReadTableReponse> ReadAndWriteTableAsync(bool isShuttingDown,
441
423
$ "Failed to update table entry due to an Etag mismatch. Failed ETag value: '{ etag } '.") ;
442
424
throw ;
443
425
}
426
+
427
+ // Ensure worker is listening to the control queue iff either:
428
+ // 1) worker just claimed the lease,
429
+ // 2) worker was already the owner in the partitions table and is not actively draining the queue.
430
+ // Note that during draining, we renew the lease but do not want to listen to new messages.
431
+ // Otherwise, we'll never finish draining our in-memory messages.
432
+ // When drain completes, and the worker may decide to release the lease. In that moment,
433
+ // IsDrainingPartition can still be true but renewedLease is false — without checking
434
+ // !releasedLease, the worker could incorrectly resume listening just before releasing the lease.
435
+ bool isRenewingToDrainQueue = renewedLease && response . IsDrainingPartition && ! releasedLease ;
436
+ if ( claimedLease || ! isRenewingToDrainQueue )
437
+ {
438
+ // Notify the orchestration session manager that we acquired a lease for one of the partitions.
439
+ // This will cause it to start reading control queue messages for that partition.
440
+ await this . service . OnTableLeaseAcquiredAsync ( partition ) ;
441
+ }
442
+
443
+ this . LogHelper ( partition , claimedLease , stoleLease , renewedLease , drainedLease , releasedLease , previousOwner ) ;
444
444
}
445
445
}
446
446
0 commit comments