Skip to content

[azeventhubs] Improperly resetting etag in the checkpoint store #20737

New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Merged
merged 2 commits into from
May 4, 2023
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
13 changes: 9 additions & 4 deletions sdk/messaging/azeventhubs/inmemory_checkpoint_store_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -206,13 +206,18 @@ func (cps *testCheckpointStore) ClaimOwnership(ctx context.Context, partitionOwn

current, exists := cps.ownerships[key]

if exists && po.ETag != nil && *current.ETag != *po.ETag {
// can't own it, didn't have the expected etag
return nil, nil
if exists {
if po.ETag == nil {
panic("Ownership blob exists, we should have claimed it using an etag")
}

if *po.ETag != *current.ETag {
// can't own it, didn't have the expected etag
return nil, nil
}
}

newOwnership := po

uuid, err := uuid.New()

if err != nil {
Expand Down
1 change: 1 addition & 0 deletions sdk/messaging/azeventhubs/internal/errors.go
Original file line number Diff line number Diff line change
Expand Up @@ -148,6 +148,7 @@ var amqpConditionsToRecoveryKind = map[amqp.ErrCond]RecoveryKind{
amqp.ErrCondNotAllowed: RecoveryKindFatal, // "amqp:not-allowed"
amqp.ErrCond("com.microsoft:entity-disabled"): RecoveryKindFatal, // entity is disabled in the portal
amqp.ErrCond("com.microsoft:session-cannot-be-locked"): RecoveryKindFatal,
amqp.ErrCond("com.microsoft:argument-out-of-range"): RecoveryKindFatal, // asked for a partition ID that doesn't exist
errorConditionLockLost: RecoveryKindFatal,
}

Expand Down
20 changes: 19 additions & 1 deletion sdk/messaging/azeventhubs/processor_load_balancer.go
Original file line number Diff line number Diff line change
Expand Up @@ -9,6 +9,8 @@ import (
"math"
"math/rand"
"time"

"github.com/Azure/azure-sdk-for-go/sdk/internal/log"
)

type processorLoadBalancer struct {
Expand Down Expand Up @@ -73,6 +75,7 @@ func (lb *processorLoadBalancer) LoadBalance(ctx context.Context, partitionIDs [
// - I have too many. We expect to have some stolen from us, but we'll maintain
// ownership for now.
claimMorePartitions = false
log.Writef(EventConsumer, "Owns %d/%d, no more needed", len(lbinfo.current), lbinfo.maxAllowed)
} else if lbinfo.extraPartitionPossible && len(lbinfo.current) == lbinfo.maxAllowed-1 {
// In the 'extraPartitionPossible' scenario, some consumers will have an extra partition
// since things don't divide up evenly. We're one under the max, which means we _might_
Expand All @@ -81,15 +84,21 @@ func (lb *processorLoadBalancer) LoadBalance(ctx context.Context, partitionIDs [
// We will attempt to grab _one_ more but only if there are free partitions available
// or if one of the consumers has more than the max allowed.
claimMorePartitions = len(lbinfo.unownedOrExpired) > 0 || len(lbinfo.aboveMax) > 0
log.Writef(EventConsumer, "Unowned/expired: %d, above max: %d, need to claim: %t",
len(lbinfo.unownedOrExpired),
len(lbinfo.aboveMax),
claimMorePartitions)
}

ownerships := lbinfo.current

if claimMorePartitions {
switch lb.strategy {
case ProcessorStrategyGreedy:
log.Writef(EventConsumer, "Using greedy strategy to claim partitions")
ownerships = lb.greedyLoadBalancer(ctx, lbinfo)
case ProcessorStrategyBalanced:
log.Writef(EventConsumer, "Using balanced strategy to claim partitions")
o := lb.balancedLoadBalancer(ctx, lbinfo)

if o != nil {
Expand All @@ -106,6 +115,8 @@ func (lb *processorLoadBalancer) LoadBalance(ctx context.Context, partitionIDs [
// getAvailablePartitions finds all partitions that are either completely unowned _or_
// their ownership is stale.
func (lb *processorLoadBalancer) getAvailablePartitions(ctx context.Context, partitionIDs []string) (loadBalancerInfo, error) {
log.Writef(EventConsumer, "[%s] Listing ownership for %s/%s/%s", lb.details.ClientID, lb.details.FullyQualifiedNamespace, lb.details.EventHubName, lb.details.ConsumerGroup)

ownerships, err := lb.checkpointStore.ListOwnership(ctx, lb.details.FullyQualifiedNamespace, lb.details.EventHubName, lb.details.ConsumerGroup, nil)

if err != nil {
Expand All @@ -132,6 +143,9 @@ func (lb *processorLoadBalancer) getAvailablePartitions(ctx context.Context, par
groupedByOwner[o.OwnerID] = append(groupedByOwner[o.OwnerID], o)
}

numExpired := len(unownedOrExpired)
log.Writef(EventConsumer, "Expired: %d", numExpired)

// add in all the unowned partitions
for _, partID := range partitionIDs {
if alreadyAdded[partID] {
Expand All @@ -149,6 +163,8 @@ func (lb *processorLoadBalancer) getAvailablePartitions(ctx context.Context, par
})
}

log.Writef(EventConsumer, "Unowned: %d", len(unownedOrExpired)-numExpired)

maxAllowed := len(partitionIDs) / len(groupedByOwner)
hasRemainder := len(partitionIDs)%len(groupedByOwner) > 0

Expand Down Expand Up @@ -188,6 +204,8 @@ func (lb *processorLoadBalancer) greedyLoadBalancer(ctx context.Context, lbinfo
ours = append(ours, randomOwnerships...)

if len(ours) < lbinfo.maxAllowed {
log.Writef(EventConsumer, "Not enough expired or unowned partitions, will need to steal from other processors")

// if that's not enough then we'll randomly steal from any owners that had partitions
// above the maximum.
randomOwnerships := getRandomOwnerships(lb.rnd, lbinfo.aboveMax, lbinfo.maxAllowed-len(ours))
Expand All @@ -197,6 +215,7 @@ func (lb *processorLoadBalancer) greedyLoadBalancer(ctx context.Context, lbinfo
for i := 0; i < len(ours); i++ {
ours[i] = lb.resetOwnership(ours[i])
}

return ours
}

Expand Down Expand Up @@ -225,7 +244,6 @@ func (lb *processorLoadBalancer) balancedLoadBalancer(ctx context.Context, lbinf
}

func (lb *processorLoadBalancer) resetOwnership(o Ownership) Ownership {
o.ETag = nil
o.OwnerID = lb.details.ClientID
return o
}
Expand Down