Skip to content

Commit 5cd4d86

Browse files
authored
Merge pull request #1125 from andyxning/fix_topic_metadata_on-demand_fetch
fix producer topic metadata on-demand fetch when topic error happens
2 parents f7df95c + ce245e8 commit 5cd4d86

File tree

1 file changed

+31
-5
lines changed

1 file changed

+31
-5
lines changed

client.go

Lines changed: 31 additions & 5 deletions
Original file line numberDiff line numberDiff line change
@@ -100,10 +100,11 @@ type client struct {
100100
seedBrokers []*Broker
101101
deadSeeds []*Broker
102102

103-
controllerID int32 // cluster controller broker id
104-
brokers map[int32]*Broker // maps broker ids to brokers
105-
metadata map[string]map[int32]*PartitionMetadata // maps topics to partition ids to metadata
106-
coordinators map[string]int32 // Maps consumer group names to coordinating broker IDs
103+
controllerID int32 // cluster controller broker id
104+
brokers map[int32]*Broker // maps broker ids to brokers
105+
metadata map[string]map[int32]*PartitionMetadata // maps topics to partition ids to metadata
106+
metadataTopics map[string]none // topics that need to collect metadata
107+
coordinators map[string]int32 // Maps consumer group names to coordinating broker IDs
107108

108109
// If the number of partitions is large, we can get some churn calling cachedPartitions,
109110
// so the result is cached. It is important to update this value whenever metadata is changed
@@ -136,6 +137,7 @@ func NewClient(addrs []string, conf *Config) (Client, error) {
136137
closed: make(chan none),
137138
brokers: make(map[int32]*Broker),
138139
metadata: make(map[string]map[int32]*PartitionMetadata),
140+
metadataTopics: make(map[string]none),
139141
cachedPartitionsResults: make(map[string][maxPartitionIndex][]int32),
140142
coordinators: make(map[string]int32),
141143
}
@@ -207,6 +209,7 @@ func (client *client) Close() error {
207209

208210
client.brokers = nil
209211
client.metadata = nil
212+
client.metadataTopics = nil
210213

211214
return nil
212215
}
@@ -231,6 +234,22 @@ func (client *client) Topics() ([]string, error) {
231234
return ret, nil
232235
}
233236

237+
func (client *client) MetadataTopics() ([]string, error) {
238+
if client.Closed() {
239+
return nil, ErrClosedClient
240+
}
241+
242+
client.lock.RLock()
243+
defer client.lock.RUnlock()
244+
245+
ret := make([]string, 0, len(client.metadataTopics))
246+
for topic := range client.metadataTopics {
247+
ret = append(ret, topic)
248+
}
249+
250+
return ret, nil
251+
}
252+
234253
func (client *client) Partitions(topic string) ([]int32, error) {
235254
if client.Closed() {
236255
return nil, ErrClosedClient
@@ -649,7 +668,7 @@ func (client *client) refreshMetadata() error {
649668
topics := []string{}
650669

651670
if !client.conf.Metadata.Full {
652-
if specificTopics, err := client.Topics(); err != nil {
671+
if specificTopics, err := client.MetadataTopics(); err != nil {
653672
return err
654673
} else if len(specificTopics) == 0 {
655674
return ErrNoTopicsToUpdateMetadata
@@ -732,9 +751,16 @@ func (client *client) updateMetadata(data *MetadataResponse, allKnownMetaData bo
732751

733752
if allKnownMetaData {
734753
client.metadata = make(map[string]map[int32]*PartitionMetadata)
754+
client.metadataTopics = make(map[string]none)
735755
client.cachedPartitionsResults = make(map[string][maxPartitionIndex][]int32)
736756
}
737757
for _, topic := range data.Topics {
758+
// topics must be added firstly to `metadataTopics` to guarantee that all
759+
// requested topics must be recorded to keep them trackable for periodically
760+
// metadata refresh.
761+
if _, exists := client.metadataTopics[topic.Name]; !exists {
762+
client.metadataTopics[topic.Name] = none{}
763+
}
738764
delete(client.metadata, topic.Name)
739765
delete(client.cachedPartitionsResults, topic.Name)
740766

0 commit comments

Comments
 (0)