Skip to content

Commit e4d725d

Browse files
authored
Merge pull request #1586 from dnwe/refresh-controller-on-change
fix: retry topic request on ControllerNotAvailable
2 parents 30e7094 + 5160d7d commit e4d725d

File tree

4 files changed

+196
-51
lines changed

4 files changed

+196
-51
lines changed

admin.go

Lines changed: 103 additions & 50 deletions
Original file line numberDiff line numberDiff line change
@@ -6,6 +6,7 @@ import (
66
"math/rand"
77
"strconv"
88
"sync"
9+
"time"
910
)
1011

1112
// ClusterAdmin is the administrative client for Kafka, which supports managing and inspecting topics,
@@ -134,8 +135,45 @@ func (ca *clusterAdmin) Controller() (*Broker, error) {
134135
return ca.client.Controller()
135136
}
136137

137-
func (ca *clusterAdmin) CreateTopic(topic string, detail *TopicDetail, validateOnly bool) error {
138+
func (ca *clusterAdmin) refreshController() (*Broker, error) {
139+
return ca.client.RefreshController()
140+
}
141+
142+
// isErrNoController returns `true` if the given error type unwraps to an
143+
// `ErrNotController` response from Kafka
144+
func isErrNoController(err error) bool {
145+
switch e := err.(type) {
146+
case *TopicError:
147+
return e.Err == ErrNotController
148+
case *TopicPartitionError:
149+
return e.Err == ErrNotController
150+
case KError:
151+
return e == ErrNotController
152+
}
153+
return false
154+
}
155+
156+
// retryOnError will repeatedly call the given (error-returning) func in the
157+
// case that its response is non-nil and retriable (as determined by the
158+
// provided retriable func) up to the maximum number of tries permitted by
159+
// the admin client configuration
160+
func (ca *clusterAdmin) retryOnError(retriable func(error) bool, fn func() error) error {
161+
var err error
162+
for attempt := 0; attempt < ca.conf.Admin.Retry.Max; attempt++ {
163+
err = fn()
164+
if err == nil || !retriable(err) {
165+
return err
166+
}
167+
Logger.Printf(
168+
"admin/request retrying after %dms... (%d attempts remaining)\n",
169+
ca.conf.Admin.Retry.Backoff/time.Millisecond, ca.conf.Admin.Retry.Max-attempt)
170+
time.Sleep(ca.conf.Admin.Retry.Backoff)
171+
continue
172+
}
173+
return err
174+
}
138175

176+
func (ca *clusterAdmin) CreateTopic(topic string, detail *TopicDetail, validateOnly bool) error {
139177
if topic == "" {
140178
return ErrInvalidTopic
141179
}
@@ -160,26 +198,31 @@ func (ca *clusterAdmin) CreateTopic(topic string, detail *TopicDetail, validateO
160198
request.Version = 2
161199
}
162200

163-
b, err := ca.Controller()
164-
if err != nil {
165-
return err
166-
}
201+
return ca.retryOnError(isErrNoController, func() error {
202+
b, err := ca.Controller()
203+
if err != nil {
204+
return err
205+
}
167206

168-
rsp, err := b.CreateTopics(request)
169-
if err != nil {
170-
return err
171-
}
207+
rsp, err := b.CreateTopics(request)
208+
if err != nil {
209+
return err
210+
}
172211

173-
topicErr, ok := rsp.TopicErrors[topic]
174-
if !ok {
175-
return ErrIncompleteResponse
176-
}
212+
topicErr, ok := rsp.TopicErrors[topic]
213+
if !ok {
214+
return ErrIncompleteResponse
215+
}
177216

178-
if topicErr.Err != ErrNoError {
179-
return topicErr
180-
}
217+
if topicErr.Err != ErrNoError {
218+
if topicErr.Err == ErrNotController {
219+
_, _ = ca.refreshController()
220+
}
221+
return topicErr
222+
}
181223

182-
return nil
224+
return nil
225+
})
183226
}
184227

185228
func (ca *clusterAdmin) DescribeTopics(topics []string) (metadata []*TopicMetadata, err error) {
@@ -320,7 +363,6 @@ func (ca *clusterAdmin) ListTopics() (map[string]TopicDetail, error) {
320363
}
321364

322365
func (ca *clusterAdmin) DeleteTopic(topic string) error {
323-
324366
if topic == "" {
325367
return ErrInvalidTopic
326368
}
@@ -334,25 +376,31 @@ func (ca *clusterAdmin) DeleteTopic(topic string) error {
334376
request.Version = 1
335377
}
336378

337-
b, err := ca.Controller()
338-
if err != nil {
339-
return err
340-
}
379+
return ca.retryOnError(isErrNoController, func() error {
380+
b, err := ca.Controller()
381+
if err != nil {
382+
return err
383+
}
341384

342-
rsp, err := b.DeleteTopics(request)
343-
if err != nil {
344-
return err
345-
}
385+
rsp, err := b.DeleteTopics(request)
386+
if err != nil {
387+
return err
388+
}
346389

347-
topicErr, ok := rsp.TopicErrorCodes[topic]
348-
if !ok {
349-
return ErrIncompleteResponse
350-
}
390+
topicErr, ok := rsp.TopicErrorCodes[topic]
391+
if !ok {
392+
return ErrIncompleteResponse
393+
}
351394

352-
if topicErr != ErrNoError {
353-
return topicErr
354-
}
355-
return nil
395+
if topicErr != ErrNoError {
396+
if topicErr == ErrNotController {
397+
_, _ = ca.refreshController()
398+
}
399+
return topicErr
400+
}
401+
402+
return nil
403+
})
356404
}
357405

358406
func (ca *clusterAdmin) CreatePartitions(topic string, count int32, assignment [][]int32, validateOnly bool) error {
@@ -368,26 +416,31 @@ func (ca *clusterAdmin) CreatePartitions(topic string, count int32, assignment [
368416
Timeout: ca.conf.Admin.Timeout,
369417
}
370418

371-
b, err := ca.Controller()
372-
if err != nil {
373-
return err
374-
}
419+
return ca.retryOnError(isErrNoController, func() error {
420+
b, err := ca.Controller()
421+
if err != nil {
422+
return err
423+
}
375424

376-
rsp, err := b.CreatePartitions(request)
377-
if err != nil {
378-
return err
379-
}
425+
rsp, err := b.CreatePartitions(request)
426+
if err != nil {
427+
return err
428+
}
380429

381-
topicErr, ok := rsp.TopicPartitionErrors[topic]
382-
if !ok {
383-
return ErrIncompleteResponse
384-
}
430+
topicErr, ok := rsp.TopicPartitionErrors[topic]
431+
if !ok {
432+
return ErrIncompleteResponse
433+
}
385434

386-
if topicErr.Err != ErrNoError {
387-
return topicErr
388-
}
435+
if topicErr.Err != ErrNoError {
436+
if topicErr.Err == ErrNotController {
437+
_, _ = ca.refreshController()
438+
}
439+
return topicErr
440+
}
389441

390-
return nil
442+
return nil
443+
})
391444
}
392445

393446
func (ca *clusterAdmin) DeleteRecords(topic string, partitionOffsets map[int32]int64) error {

admin_test.go

Lines changed: 48 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -1104,3 +1104,51 @@ func TestDeleteConsumerGroup(t *testing.T) {
11041104
}
11051105

11061106
}
1107+
1108+
// TestRefreshMetaDataWithDifferentController ensures that the cached
1109+
// controller can be forcibly updated from Metadata by the admin client
1110+
func TestRefreshMetaDataWithDifferentController(t *testing.T) {
1111+
seedBroker1 := NewMockBroker(t, 1)
1112+
seedBroker2 := NewMockBroker(t, 2)
1113+
defer seedBroker1.Close()
1114+
defer seedBroker2.Close()
1115+
1116+
seedBroker1.SetHandlerByMap(map[string]MockResponse{
1117+
"MetadataRequest": NewMockMetadataResponse(t).
1118+
SetController(seedBroker1.BrokerID()).
1119+
SetBroker(seedBroker1.Addr(), seedBroker1.BrokerID()).
1120+
SetBroker(seedBroker2.Addr(), seedBroker2.BrokerID()),
1121+
})
1122+
1123+
config := NewConfig()
1124+
config.Version = V1_1_0_0
1125+
1126+
client, err := NewClient([]string{seedBroker1.Addr()}, config)
1127+
if err != nil {
1128+
t.Fatal(err)
1129+
}
1130+
1131+
ca := clusterAdmin{client: client, conf: config}
1132+
1133+
if b, _ := ca.Controller(); seedBroker1.BrokerID() != b.ID() {
1134+
t.Fatalf("expected cached controller to be %d rather than %d",
1135+
seedBroker1.BrokerID(), b.ID())
1136+
}
1137+
1138+
seedBroker1.SetHandlerByMap(map[string]MockResponse{
1139+
"MetadataRequest": NewMockMetadataResponse(t).
1140+
SetController(seedBroker2.BrokerID()).
1141+
SetBroker(seedBroker1.Addr(), seedBroker1.BrokerID()).
1142+
SetBroker(seedBroker2.Addr(), seedBroker2.BrokerID()),
1143+
})
1144+
1145+
if b, _ := ca.refreshController(); seedBroker2.BrokerID() != b.ID() {
1146+
t.Fatalf("expected refreshed controller to be %d rather than %d",
1147+
seedBroker2.BrokerID(), b.ID())
1148+
}
1149+
1150+
if b, _ := ca.Controller(); seedBroker2.BrokerID() != b.ID() {
1151+
t.Fatalf("expected cached controller to be %d rather than %d",
1152+
seedBroker2.BrokerID(), b.ID())
1153+
}
1154+
}

client.go

Lines changed: 36 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -17,9 +17,15 @@ type Client interface {
1717
// altered after it has been created.
1818
Config() *Config
1919

20-
// Controller returns the cluster controller broker. Requires Kafka 0.10 or higher.
20+
// Controller returns the cluster controller broker. It will return a
21+
// locally cached value if it's available. You can call RefreshController
22+
// to update the cached value. Requires Kafka 0.10 or higher.
2123
Controller() (*Broker, error)
2224

25+
// RefreshController retrieves the cluster controller from fresh metadata
26+
// and stores it in the local cache. Requires Kafka 0.10 or higher.
27+
RefreshController() (*Broker, error)
28+
2329
// Brokers returns the current set of active brokers as retrieved from cluster metadata.
2430
Brokers() []*Broker
2531

@@ -487,6 +493,35 @@ func (client *client) Controller() (*Broker, error) {
487493
return controller, nil
488494
}
489495

496+
// deregisterController removes the cached controllerID
497+
func (client *client) deregisterController() {
498+
client.lock.Lock()
499+
defer client.lock.Unlock()
500+
delete(client.brokers, client.controllerID)
501+
}
502+
503+
// RefreshController retrieves the cluster controller from fresh metadata
504+
// and stores it in the local cache. Requires Kafka 0.10 or higher.
505+
func (client *client) RefreshController() (*Broker, error) {
506+
if client.Closed() {
507+
return nil, ErrClosedClient
508+
}
509+
510+
client.deregisterController()
511+
512+
if err := client.refreshMetadata(); err != nil {
513+
return nil, err
514+
}
515+
516+
controller := client.cachedController()
517+
if controller == nil {
518+
return nil, ErrControllerNotAvailable
519+
}
520+
521+
_ = controller.Open(client.conf)
522+
return controller, nil
523+
}
524+
490525
func (client *client) Coordinator(consumerGroup string) (*Broker, error) {
491526
if client.Closed() {
492527
return nil, ErrClosedClient

config.go

Lines changed: 9 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -21,6 +21,13 @@ var validID = regexp.MustCompile(`\A[A-Za-z0-9._-]+\z`)
2121
type Config struct {
2222
// Admin is the namespace for ClusterAdmin properties used by the administrative Kafka client.
2323
Admin struct {
24+
Retry struct {
25+
// The total number of times to retry sending (retriable) admin requests (default 5).
26+
// Similar to the `retries` setting of the JVM AdminClientConfig.
27+
Max int
28+
// Backoff time between retries of a failed request (default 100ms)
29+
Backoff time.Duration
30+
}
2431
// The maximum duration the administrative Kafka client will wait for ClusterAdmin operations,
2532
// including topics, brokers, configurations and ACLs (defaults to 3 seconds).
2633
Timeout time.Duration
@@ -408,6 +415,8 @@ type Config struct {
408415
func NewConfig() *Config {
409416
c := &Config{}
410417

418+
c.Admin.Retry.Max = 5
419+
c.Admin.Retry.Backoff = 100 * time.Millisecond
411420
c.Admin.Timeout = 3 * time.Second
412421

413422
c.Net.MaxOpenRequests = 5

0 commit comments

Comments
 (0)