Skip to content

fix producer topic metadata on-demand fetch when topic error happens #1125

New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Merged
merged 1 commit into from
Jul 12, 2018
Merged
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
36 changes: 31 additions & 5 deletions client.go
Original file line number Diff line number Diff line change
Expand Up @@ -100,10 +100,11 @@ type client struct {
seedBrokers []*Broker
deadSeeds []*Broker

controllerID int32 // cluster controller broker id
brokers map[int32]*Broker // maps broker ids to brokers
metadata map[string]map[int32]*PartitionMetadata // maps topics to partition ids to metadata
coordinators map[string]int32 // Maps consumer group names to coordinating broker IDs
controllerID int32 // cluster controller broker id
brokers map[int32]*Broker // maps broker ids to brokers
metadata map[string]map[int32]*PartitionMetadata // maps topics to partition ids to metadata
metadataTopics map[string]none // topics that need to collect metadata
coordinators map[string]int32 // Maps consumer group names to coordinating broker IDs

// If the number of partitions is large, we can get some churn calling cachedPartitions,
// so the result is cached. It is important to update this value whenever metadata is changed
Expand Down Expand Up @@ -136,6 +137,7 @@ func NewClient(addrs []string, conf *Config) (Client, error) {
closed: make(chan none),
brokers: make(map[int32]*Broker),
metadata: make(map[string]map[int32]*PartitionMetadata),
metadataTopics: make(map[string]none),
cachedPartitionsResults: make(map[string][maxPartitionIndex][]int32),
coordinators: make(map[string]int32),
}
Expand Down Expand Up @@ -207,6 +209,7 @@ func (client *client) Close() error {

client.brokers = nil
client.metadata = nil
client.metadataTopics = nil

return nil
}
Expand All @@ -231,6 +234,22 @@ func (client *client) Topics() ([]string, error) {
return ret, nil
}

func (client *client) MetadataTopics() ([]string, error) {
if client.Closed() {
return nil, ErrClosedClient
}

client.lock.RLock()
defer client.lock.RUnlock()

ret := make([]string, 0, len(client.metadataTopics))
for topic := range client.metadataTopics {
ret = append(ret, topic)
}

return ret, nil
}

func (client *client) Partitions(topic string) ([]int32, error) {
if client.Closed() {
return nil, ErrClosedClient
Expand Down Expand Up @@ -645,7 +664,7 @@ func (client *client) refreshMetadata() error {
topics := []string{}

if !client.conf.Metadata.Full {
if specificTopics, err := client.Topics(); err != nil {
if specificTopics, err := client.MetadataTopics(); err != nil {
return err
} else if len(specificTopics) == 0 {
return ErrNoTopicsToUpdateMetadata
Expand Down Expand Up @@ -728,9 +747,16 @@ func (client *client) updateMetadata(data *MetadataResponse, allKnownMetaData bo

if allKnownMetaData {
client.metadata = make(map[string]map[int32]*PartitionMetadata)
client.metadataTopics = make(map[string]none)
client.cachedPartitionsResults = make(map[string][maxPartitionIndex][]int32)
}
for _, topic := range data.Topics {
// topics must be added firstly to `metadataTopics` to guarantee that all
// requested topics must be recorded to keep them trackable for periodically
// metadata refresh.
if _, exists := client.metadataTopics[topic.Name]; !exists {
client.metadataTopics[topic.Name] = none{}
}
delete(client.metadata, topic.Name)
delete(client.cachedPartitionsResults, topic.Name)

Expand Down