Skip to content

Commit 00180fc

Browse files
committed
Clear all metadata when we have the latest info
1 parent f7466ea commit 00180fc

File tree

1 file changed

+7
-3
lines changed

1 file changed

+7
-3
lines changed

client.go

Lines changed: 7 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -649,8 +649,9 @@ func (client *client) tryRefreshMetadata(topics []string, attemptsRemaining int)
649649

650650
switch err.(type) {
651651
case nil:
652+
allKnownMetaData := len(topics) == 0
652653
// valid response, use it
653-
shouldRetry, err := client.updateMetadata(response)
654+
shouldRetry, err := client.updateMetadata(response, allKnownMetaData)
654655
if shouldRetry {
655656
Logger.Println("client/metadata found some partitions to be leaderless")
656657
return retry(err) // note: err can be nil
@@ -674,7 +675,7 @@ func (client *client) tryRefreshMetadata(topics []string, attemptsRemaining int)
674675
}
675676

676677
// if no fatal error, returns a list of topics that need retrying due to ErrLeaderNotAvailable
677-
func (client *client) updateMetadata(data *MetadataResponse) (retry bool, err error) {
678+
func (client *client) updateMetadata(data *MetadataResponse, allKnownMetaData bool) (retry bool, err error) {
678679
client.lock.Lock()
679680
defer client.lock.Unlock()
680681

@@ -685,7 +686,10 @@ func (client *client) updateMetadata(data *MetadataResponse) (retry bool, err er
685686
for _, broker := range data.Brokers {
686687
client.registerBroker(broker)
687688
}
688-
689+
if allKnownMetaData {
690+
client.metadata = make(map[string]map[int32]*PartitionMetadata)
691+
client.cachedPartitionsResults = make(map[string][maxPartitionIndex][]int32)
692+
}
689693
for _, topic := range data.Topics {
690694
delete(client.metadata, topic.Name)
691695
delete(client.cachedPartitionsResults, topic.Name)

0 commit comments

Comments
 (0)