Skip to content

Commit 635bcf3

Browse files
authored
Merge pull request #2076 from Shopify/dnwe/fix-fetch-from-follower
fix: only update preferredReadReplica if valid
2 parents 556ddf7 + cbcd734 commit 635bcf3

File tree

2 files changed

+16
-11
lines changed

2 files changed

+16
-11
lines changed

consumer.go

Lines changed: 14 additions & 11 deletions
Original file line numberDiff line numberDiff line change
@@ -132,16 +132,17 @@ func (c *consumer) Partitions(topic string) ([]int32, error) {
132132

133133
func (c *consumer) ConsumePartition(topic string, partition int32, offset int64) (PartitionConsumer, error) {
134134
child := &partitionConsumer{
135-
consumer: c,
136-
conf: c.conf,
137-
topic: topic,
138-
partition: partition,
139-
messages: make(chan *ConsumerMessage, c.conf.ChannelBufferSize),
140-
errors: make(chan *ConsumerError, c.conf.ChannelBufferSize),
141-
feeder: make(chan *FetchResponse, 1),
142-
trigger: make(chan none, 1),
143-
dying: make(chan none),
144-
fetchSize: c.conf.Consumer.Fetch.Default,
135+
consumer: c,
136+
conf: c.conf,
137+
topic: topic,
138+
partition: partition,
139+
messages: make(chan *ConsumerMessage, c.conf.ChannelBufferSize),
140+
errors: make(chan *ConsumerError, c.conf.ChannelBufferSize),
141+
feeder: make(chan *FetchResponse, 1),
142+
preferredReadReplica: invalidPreferredReplicaID,
143+
trigger: make(chan none, 1),
144+
dying: make(chan none),
145+
fetchSize: c.conf.Consumer.Fetch.Default,
145146
}
146147

147148
if err := child.chooseStartingOffset(offset); err != nil {
@@ -605,7 +606,9 @@ func (child *partitionConsumer) parseResponse(response *FetchResponse) ([]*Consu
605606

606607
consumerBatchSizeMetric.Update(int64(nRecs))
607608

608-
child.preferredReadReplica = block.PreferredReadReplica
609+
if block.PreferredReadReplica != invalidPreferredReplicaID {
610+
child.preferredReadReplica = block.PreferredReadReplica
611+
}
609612

610613
if nRecs == 0 {
611614
partialTrailingMessage, err := block.isPartial()

fetch_response.go

Lines changed: 2 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -5,6 +5,8 @@ import (
55
"time"
66
)
77

8+
const invalidPreferredReplicaID = -1
9+
810
type AbortedTransaction struct {
911
ProducerID int64
1012
FirstOffset int64

0 commit comments

Comments
 (0)