Skip to content

Commit 3aea989

Browse files
fix(producer): treat ErrKafkaStorageError as retriable (#2939)
This is retriable according to the spec: https://kafka.apache.org/protocol.html Signed-off-by: Richard Artoul <[email protected]>
1 parent 9ded629 commit 3aea989

File tree

1 file changed

+2
-2
lines changed

1 file changed

+2
-2
lines changed

async_producer.go

Lines changed: 2 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -1101,7 +1101,7 @@ func (bp *brokerProducer) handleSuccess(sent *produceSet, response *ProduceRespo
11011101
bp.parent.returnSuccesses(pSet.msgs)
11021102
// Retriable errors
11031103
case ErrInvalidMessage, ErrUnknownTopicOrPartition, ErrLeaderNotAvailable, ErrNotLeaderForPartition,
1104-
ErrRequestTimedOut, ErrNotEnoughReplicas, ErrNotEnoughReplicasAfterAppend:
1104+
ErrRequestTimedOut, ErrNotEnoughReplicas, ErrNotEnoughReplicasAfterAppend, ErrKafkaStorageError:
11051105
if bp.parent.conf.Producer.Retry.Max <= 0 {
11061106
bp.parent.abandonBrokerConnection(bp.broker)
11071107
bp.parent.returnErrors(pSet.msgs, block.Err)
@@ -1134,7 +1134,7 @@ func (bp *brokerProducer) handleSuccess(sent *produceSet, response *ProduceRespo
11341134

11351135
switch block.Err {
11361136
case ErrInvalidMessage, ErrUnknownTopicOrPartition, ErrLeaderNotAvailable, ErrNotLeaderForPartition,
1137-
ErrRequestTimedOut, ErrNotEnoughReplicas, ErrNotEnoughReplicasAfterAppend:
1137+
ErrRequestTimedOut, ErrNotEnoughReplicas, ErrNotEnoughReplicasAfterAppend, ErrKafkaStorageError:
11381138
Logger.Printf("producer/broker/%d state change to [retrying] on %s/%d because %v\n",
11391139
bp.broker.ID(), topic, partition, block.Err)
11401140
if bp.currentRetries[topic] == nil {

0 commit comments

Comments
 (0)