Skip to content

Infinite loop with malformed message to Avro formatted topic with Retry and DLT configuration #2415

New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Closed
cbordadorahappymoney opened this issue Sep 26, 2022 · 2 comments · Fixed by #2416

Comments

@cbordadorahappymoney
Copy link

cbordadorahappymoney commented Sep 26, 2022

In what version(s) of Spring for Apache Kafka are you seeing this issue?
version: 2.8.8

Describe the bug
I am using Avro schema for the kafka messages and the consumer is setup with Spring RetryTopic and DLT.
When consumer encounters an exception during the process of a message and the message goes into the retry topic and then the DLT topic for handling as expected.
When later, it receives a malformed message that cannot be processed correctly, Spring Kafka infinitely retries the message in DLT. Then the application needs to be stopped and the topic cleared before starting again.

The log message is as follows:

DeadLetterPublishingRecovererFactory - Record: topic = request.example, partition = 0, offset = 2, main topic = request.example threw an error at topic request.example and won't be retried. Sending to DLT with name request.example-dlt. {}
org.springframework.kafka.listener.ListenerExecutionFailedException: Listener failed; nested exception is org.springframework.kafka.support.serializer.DeserializationException: failed to deserialize; nested exception is org.apache.kafka.common.errors.SerializationException: Unknown magic byte!

LoggingProducerListener - Exception thrown when sending a message with key='fake-1' and payload='byte[7]' to topic request.example-dlt and partition 0: {}
org.apache.kafka.common.errors.InvalidConfigurationException: Schema being registered is incompatible with an earlier schema for subject "request.example-dlt-value"; error code: 409

Dead-letter publication to request.example-dltfailed for: request.example-0@2 {}
org.springframework.kafka.KafkaException: Send failed; nested exception is org.apache.kafka.common.errors.InvalidConfigurationException: Schema being registered is incompatible with an earlier schema for subject "request.example-dlt-value"; error code: 409

DefaultErrorHandler - Recovery of record (request.example-0@2) failed {}
org.springframework.kafka.KafkaException: Dead-letter publication to request.example-dltfailed for: request.example-0@2

To Reproduce

  1. Run the new application. It generates the new topics.
  2. Send a well-formed message where the consumer process it successfully.
  3. Send a well-formed message where the consumer encounters a Runtime exception. It is retried then enters the DLT successfully.
  4. Send a malformed message (e.g. regular text, "testing") which is not in Avro format. Spring Kafka encounters an exception. It skips the retry and directly enters DLT. This is repeated until the application is killed.

Expected behavior
Spring Kafka should not retry a message in the same DLT indefinitely.
It would be desirable to configure a special dead letter topic for Deseralization exception, so they can be handled separately then other type of exceptions. If the Avro prevents a topic not tied to a schema, then allowing to configure a handler function would be the next option and ideally the handler would be provided with the binary content of the message, topic name, offset, partition, key, headers and exception information.

Sample
Here's a link to a sample code with instructions to reproduce problem:
https://github.com/clbtrain/spring-kafka-avro-dlt-infinite

@garyrussell
Copy link
Contributor

garyrussell commented Sep 27, 2022

Publishing a serialization exception should not be using a producer configured with the Avro serializer; it must use a ByteArraySerializer since the raw, undeserialized byte[] is published.

One work around would be a custom serializer that looks at the record.value() and delegates to either the avro serializer or the ByteArraySerializer depending on its type.

The DLPR already supports injection of a map of templates (where the key is the type and the value is a template), but this is not exposed by the DeadLetterPublishingRecovererFactory.

I think we can enhance the DLPR to deal with this internally, but for now, the above work around should work for you.

@garyrussell
Copy link
Contributor

Actually, we already have the DelegatingByTypeSerializer - simply use that, configured to use a ByteArraySerializer for byte[] and your avro serializer for all other types.

We will document that it should be used with an ErrorHandlingSerializer with a DLPR.

Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Projects
None yet
Development

Successfully merging a pull request may close this issue.

2 participants