Skip to content

Commit 7f26806

Browse files
committed
Use priority queue to sort consumer group members by number of partition assignments
1 parent 78d3678 commit 7f26806

File tree

1 file changed

+78
-39
lines changed

1 file changed

+78
-39
lines changed

balance_strategy.go

Lines changed: 78 additions & 39 deletions
Original file line numberDiff line numberDiff line change
@@ -1,6 +1,7 @@
11
package sarama
22

33
import (
4+
"container/heap"
45
"math"
56
"sort"
67
)
@@ -507,22 +508,14 @@ func canTopicPartitionParticipateInReassignment(partition topicPartitionAssignme
507508

508509
// The assignment should improve the overall balance of the partition assignments to consumers.
509510
func assignPartition(partition topicPartitionAssignment, sortedCurrentSubscriptions []string, currentAssignment map[string][]topicPartitionAssignment, consumer2AllPotentialPartitions map[string][]topicPartitionAssignment, currentPartitionConsumer map[topicPartitionAssignment]string) []string {
510-
updatedSubscriptions := make([]string, len(sortedCurrentSubscriptions))
511-
for i, s := range sortedCurrentSubscriptions {
512-
updatedSubscriptions[i] = s
513-
}
514-
i := 0
515511
for _, memberID := range sortedCurrentSubscriptions {
516512
if memberAssignmentsIncludeTopicPartition(consumer2AllPotentialPartitions[memberID], partition) {
517-
updatedSubscriptions = removeIndexFromStringSlice(updatedSubscriptions, i)
518513
currentAssignment[memberID] = append(currentAssignment[memberID], partition)
519514
currentPartitionConsumer[partition] = memberID
520-
updatedSubscriptions = append(updatedSubscriptions, memberID)
521515
break
522516
}
523-
i++
524517
}
525-
return updatedSubscriptions
518+
return sortMemberIDsByPartitionAssignments(currentAssignment)
526519
}
527520

528521
// Deserialize topic partition assignment data to aid with creation of a sticky assignment.
@@ -599,44 +592,52 @@ func sortPartitions(currentAssignment map[string][]topicPartitionAssignment, par
599592
// most assigned partitions to those with least)
600593
assignments := filterAssignedPartitions(currentAssignment, partition2AllPotentialConsumers)
601594

602-
// sortedMemberIDs contains a descending-sorted list of consumers based on how many valid partitions are currently assigned to them
603-
sortedMemberIDs := sortMemberIDsByPartitionAssignments(assignments)
604-
for i := len(sortedMemberIDs)/2 - 1; i >= 0; i-- {
605-
opp := len(sortedMemberIDs) - 1 - i
606-
sortedMemberIDs[i], sortedMemberIDs[opp] = sortedMemberIDs[opp], sortedMemberIDs[i]
595+
// use priority-queue to evaluate consumer group members in descending-order based on
596+
// the number of topic partition assignments (i.e. consumers with most assignments first)
597+
pq := make(assignmentPriorityQueue, len(assignments))
598+
i := 0
599+
for consumerID, consumerAssignments := range assignments {
600+
pq[i] = &consumerGroupMember{
601+
id: consumerID,
602+
assignments: consumerAssignments,
603+
index: i,
604+
}
605+
i++
607606
}
607+
heap.Init(&pq)
608+
608609
for {
609610
// loop until no consumer-group members remain
610-
if len(sortedMemberIDs) == 0 {
611+
if pq.Len() == 0 {
611612
break
612613
}
613-
updatedMemberIDs := make([]string, 0)
614-
for _, memberID := range sortedMemberIDs {
615-
// partitions that were assigned to a different consumer last time
616-
prevPartitions := make([]topicPartitionAssignment, 0)
617-
for partition := range partitionsWithADifferentPreviousAssignment {
618-
// from partitions that had a different consumer before, keep only those that are assigned to this consumer now
619-
if memberAssignmentsIncludeTopicPartition(assignments[memberID], partition) {
620-
prevPartitions = append(prevPartitions, partition)
621-
}
614+
member := pq[0]
615+
616+
// partitions that were assigned to a different consumer last time
617+
prevPartitions := make([]topicPartitionAssignment, 0)
618+
for partition := range partitionsWithADifferentPreviousAssignment {
619+
// from partitions that had a different consumer before, keep only those that are assigned to this consumer now
620+
if memberAssignmentsIncludeTopicPartition(member.assignments, partition) {
621+
prevPartitions = append(prevPartitions, partition)
622622
}
623+
}
623624

624-
if len(prevPartitions) > 0 {
625-
// if there is a partition on this consumer that was assigned to another consumer before mark it as good options for reassignment
626-
partition := prevPartitions[0]
627-
prevPartitions = append(prevPartitions[:0], prevPartitions[1:]...)
628-
assignments[memberID] = removeTopicPartitionFromMemberAssignments(assignments[memberID], partition)
629-
sortedPartitions = append(sortedPartitions, partition)
630-
updatedMemberIDs = append(updatedMemberIDs, memberID)
631-
} else if len(assignments[memberID]) > 0 {
632-
// otherwise, mark any other one of the current partitions as a reassignment candidate
633-
partition := assignments[memberID][0]
634-
assignments[memberID] = append(assignments[memberID][:0], assignments[memberID][1:]...)
635-
sortedPartitions = append(sortedPartitions, partition)
636-
updatedMemberIDs = append(updatedMemberIDs, memberID)
637-
}
625+
if len(prevPartitions) > 0 {
626+
// if there is a partition on this consumer that was assigned to another consumer before mark it as good options for reassignment
627+
partition := prevPartitions[0]
628+
prevPartitions = append(prevPartitions[:0], prevPartitions[1:]...)
629+
member.assignments = removeTopicPartitionFromMemberAssignments(member.assignments, partition)
630+
sortedPartitions = append(sortedPartitions, partition)
631+
heap.Fix(&pq, member.index)
632+
} else if len(member.assignments) > 0 {
633+
// otherwise, mark any other one of the current partitions as a reassignment candidate
634+
partition := member.assignments[0]
635+
member.assignments = append(member.assignments[:0], member.assignments[1:]...)
636+
sortedPartitions = append(sortedPartitions, partition)
637+
heap.Fix(&pq, member.index)
638+
} else {
639+
heap.Remove(&pq, 0)
638640
}
639-
sortedMemberIDs = updatedMemberIDs
640641
}
641642

642643
for partition := range partition2AllPotentialConsumers {
@@ -1034,3 +1035,41 @@ nextCand:
10341035
}
10351036
return -1
10361037
}
1038+
1039+
type consumerGroupMember struct {
1040+
index int // the index of the item in the heap
1041+
id string
1042+
assignments []topicPartitionAssignment
1043+
}
1044+
1045+
// A assignmentPriorityQueue implements heap.Interface and holds Items.
1046+
type assignmentPriorityQueue []*consumerGroupMember
1047+
1048+
func (pq assignmentPriorityQueue) Len() int { return len(pq) }
1049+
1050+
func (pq assignmentPriorityQueue) Less(i, j int) bool {
1051+
// We want Pop to give us the highest, not lowest, priority so we use greater than here.
1052+
return len(pq[i].assignments) > len(pq[j].assignments)
1053+
}
1054+
1055+
func (pq assignmentPriorityQueue) Swap(i, j int) {
1056+
pq[i], pq[j] = pq[j], pq[i]
1057+
pq[i].index = i
1058+
pq[j].index = j
1059+
}
1060+
1061+
func (pq *assignmentPriorityQueue) Push(x interface{}) {
1062+
n := len(*pq)
1063+
member := x.(*consumerGroupMember)
1064+
member.index = n
1065+
*pq = append(*pq, member)
1066+
}
1067+
1068+
func (pq *assignmentPriorityQueue) Pop() interface{} {
1069+
old := *pq
1070+
n := len(old)
1071+
member := old[n-1]
1072+
member.index = -1 // for safety
1073+
*pq = old[0 : n-1]
1074+
return member
1075+
}

0 commit comments

Comments
 (0)