Skip to content

Commit 1358013

Browse files
chengjoeydnwe
authored andcommitted
feat(admin): implement leader election api
Signed-off-by: joey <[email protected]>
1 parent 88fd713 commit 1358013

10 files changed

+483
-1
lines changed

admin.go

Lines changed: 36 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -99,6 +99,9 @@ type ClusterAdmin interface {
9999
// This operation is supported by brokers with version 0.11.0.0 or higher.
100100
DeleteACL(filter AclFilter, validateOnly bool) ([]MatchingAcl, error)
101101

102+
// ElectLeaders allows to trigger the election of preferred leaders for a set of partitions.
103+
ElectLeaders(ElectionType, map[string][]int32) (map[string]map[int32]*PartitionResult, error)
104+
102105
// List the consumer groups available in the cluster.
103106
ListConsumerGroups() (map[string]string, error)
104107

@@ -907,6 +910,39 @@ func (ca *clusterAdmin) DeleteACL(filter AclFilter, validateOnly bool) ([]Matchi
907910
return mAcls, nil
908911
}
909912

913+
func (ca *clusterAdmin) ElectLeaders(electionType ElectionType, partitions map[string][]int32) (map[string]map[int32]*PartitionResult, error) {
914+
request := &ElectLeadersRequest{
915+
Type: electionType,
916+
TopicPartitions: partitions,
917+
TimeoutMs: int32(60000),
918+
}
919+
920+
if ca.conf.Version.IsAtLeast(V2_4_0_0) {
921+
request.Version = 2
922+
} else if ca.conf.Version.IsAtLeast(V0_11_0_0) {
923+
request.Version = 1
924+
}
925+
926+
var res *ElectLeadersResponse
927+
err := ca.retryOnError(isErrNotController, func() error {
928+
b, err := ca.Controller()
929+
if err != nil {
930+
return err
931+
}
932+
_ = b.Open(ca.client.Config())
933+
934+
res, err = b.ElectLeaders(request)
935+
if isErrNotController(err) {
936+
_, _ = ca.refreshController()
937+
}
938+
return err
939+
})
940+
if err != nil {
941+
return nil, err
942+
}
943+
return res.ReplicaElectionResults, nil
944+
}
945+
910946
func (ca *clusterAdmin) DescribeConsumerGroups(groups []string) (result []*GroupDescription, err error) {
911947
groupsPerBroker := make(map[*Broker][]string)
912948

admin_test.go

Lines changed: 39 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -1325,6 +1325,45 @@ func TestClusterAdminDeleteAcl(t *testing.T) {
13251325
}
13261326
}
13271327

1328+
func TestElectLeaders(t *testing.T) {
1329+
broker := NewMockBroker(t, 1)
1330+
defer broker.Close()
1331+
1332+
broker.SetHandlerByMap(map[string]MockResponse{
1333+
"ApiVersionsRequest": NewMockApiVersionsResponse(t),
1334+
"MetadataRequest": NewMockMetadataResponse(t).
1335+
SetController(broker.BrokerID()).
1336+
SetBroker(broker.Addr(), broker.BrokerID()),
1337+
"ElectLeadersRequest": NewMockElectLeadersResponse(t),
1338+
})
1339+
1340+
config := NewTestConfig()
1341+
config.Version = V2_4_0_0
1342+
admin, err := NewClusterAdmin([]string{broker.Addr()}, config)
1343+
if err != nil {
1344+
t.Fatal(err)
1345+
}
1346+
1347+
response, err := admin.ElectLeaders(PreferredElection, map[string][]int32{"my_topic": {0, 1}})
1348+
if err != nil {
1349+
t.Fatal(err)
1350+
}
1351+
1352+
partitionResult, ok := response["my_topic"]
1353+
if !ok {
1354+
t.Fatalf("topic missing in response")
1355+
}
1356+
1357+
if len(partitionResult) != 1 {
1358+
t.Fatalf("partition missing in response")
1359+
}
1360+
1361+
err = admin.Close()
1362+
if err != nil {
1363+
t.Fatal(err)
1364+
}
1365+
}
1366+
13281367
func TestDescribeTopic(t *testing.T) {
13291368
seedBroker := NewMockBroker(t, 1)
13301369
defer seedBroker.Close()

broker.go

Lines changed: 12 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -689,6 +689,18 @@ func (b *Broker) ListPartitionReassignments(request *ListPartitionReassignmentsR
689689
return response, nil
690690
}
691691

692+
// ElectLeaders sends aa elect leaders request and returns list partitions elect result
693+
func (b *Broker) ElectLeaders(request *ElectLeadersRequest) (*ElectLeadersResponse, error) {
694+
response := new(ElectLeadersResponse)
695+
696+
err := b.sendAndReceive(request, response)
697+
if err != nil {
698+
return nil, err
699+
}
700+
701+
return response, nil
702+
}
703+
692704
// DeleteRecords send a request to delete records and return delete record
693705
// response or error
694706
func (b *Broker) DeleteRecords(request *DeleteRecordsRequest) (*DeleteRecordsResponse, error) {

elect_leaders_request.go

Lines changed: 134 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,134 @@
1+
package sarama
2+
3+
type ElectLeadersRequest struct {
4+
Version int16
5+
Type ElectionType
6+
TopicPartitions map[string][]int32
7+
TimeoutMs int32
8+
}
9+
10+
func (r *ElectLeadersRequest) encode(pe packetEncoder) error {
11+
if r.Version > 0 {
12+
pe.putInt8(int8(r.Type))
13+
}
14+
15+
pe.putCompactArrayLength(len(r.TopicPartitions))
16+
17+
for topic, partitions := range r.TopicPartitions {
18+
if r.Version < 2 {
19+
if err := pe.putString(topic); err != nil {
20+
return err
21+
}
22+
} else {
23+
if err := pe.putCompactString(topic); err != nil {
24+
return err
25+
}
26+
}
27+
28+
if err := pe.putCompactInt32Array(partitions); err != nil {
29+
return err
30+
}
31+
32+
if r.Version >= 2 {
33+
pe.putEmptyTaggedFieldArray()
34+
}
35+
}
36+
37+
pe.putInt32(r.TimeoutMs)
38+
39+
if r.Version >= 2 {
40+
pe.putEmptyTaggedFieldArray()
41+
}
42+
43+
return nil
44+
}
45+
46+
func (r *ElectLeadersRequest) decode(pd packetDecoder, version int16) (err error) {
47+
r.Version = version
48+
if r.Version > 0 {
49+
t, err := pd.getInt8()
50+
if err != nil {
51+
return err
52+
}
53+
r.Type = ElectionType(t)
54+
}
55+
56+
topicCount, err := pd.getCompactArrayLength()
57+
if err != nil {
58+
return err
59+
}
60+
if topicCount > 0 {
61+
r.TopicPartitions = make(map[string][]int32)
62+
for i := 0; i < topicCount; i++ {
63+
var topic string
64+
if r.Version < 2 {
65+
topic, err = pd.getString()
66+
} else {
67+
topic, err = pd.getCompactString()
68+
}
69+
if err != nil {
70+
return err
71+
}
72+
partitionCount, err := pd.getCompactArrayLength()
73+
if err != nil {
74+
return err
75+
}
76+
partitions := make([]int32, partitionCount)
77+
for j := 0; j < partitionCount; j++ {
78+
partition, err := pd.getInt32()
79+
if err != nil {
80+
return err
81+
}
82+
partitions[j] = partition
83+
}
84+
r.TopicPartitions[topic] = partitions
85+
if r.Version >= 2 {
86+
if _, err := pd.getEmptyTaggedFieldArray(); err != nil {
87+
return err
88+
}
89+
}
90+
}
91+
}
92+
93+
r.TimeoutMs, err = pd.getInt32()
94+
if err != nil {
95+
return err
96+
}
97+
98+
if r.Version >= 2 {
99+
if _, err := pd.getEmptyTaggedFieldArray(); err != nil {
100+
return err
101+
}
102+
}
103+
104+
return nil
105+
}
106+
107+
func (r *ElectLeadersRequest) key() int16 {
108+
return 43
109+
}
110+
111+
func (r *ElectLeadersRequest) version() int16 {
112+
return r.Version
113+
}
114+
115+
func (r *ElectLeadersRequest) headerVersion() int16 {
116+
return 2
117+
}
118+
119+
func (r *ElectLeadersRequest) isValidVersion() bool {
120+
return r.Version >= 0 && r.Version <= 2
121+
}
122+
123+
func (r *ElectLeadersRequest) requiredVersion() KafkaVersion {
124+
switch r.Version {
125+
case 2:
126+
return V2_4_0_0
127+
case 1:
128+
return V0_11_0_0
129+
case 0:
130+
return V0_10_0_0
131+
default:
132+
return V2_4_0_0
133+
}
134+
}

elect_leaders_request_test.go

Lines changed: 26 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,26 @@
1+
package sarama
2+
3+
import "testing"
4+
5+
var electLeadersRequestOneTopic = []byte{
6+
0, // preferred election type
7+
2, // 2-1=1 topic
8+
6, 116, 111, 112, 105, 99, // topic name "topic" as compact string
9+
2, // 2-1=1 partition
10+
0, 0, 0, 0, // partition 0
11+
0, 0, // empty tagged fields
12+
0, 39, 16, 0, // timeout 10000
13+
}
14+
15+
func TestElectLeadersRequest(t *testing.T) {
16+
var request = &ElectLeadersRequest{
17+
TimeoutMs: int32(10000),
18+
Version: int16(2),
19+
TopicPartitions: map[string][]int32{
20+
"topic": {0},
21+
},
22+
Type: PreferredElection,
23+
}
24+
25+
testRequest(t, "one topic", request, electLeadersRequestOneTopic)
26+
}

0 commit comments

Comments
 (0)