Skip to content

Cloud Stream Dead Letter Topic support #358

New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Merged
merged 7 commits into from
Mar 15, 2021
Merged

Cloud Stream Dead Letter Topic support #358

merged 7 commits into from
Mar 15, 2021

Conversation

ttomsu
Copy link

@ttomsu ttomsu commented Mar 10, 2021

Fixes #158
Two commits here - one for the production code and one for the sample.

Since Cloud Stream handles all things related to topic/subscription creation, we need a way to support dead letter topics. This PR expands the PubSubConsumerProperties to allow setting deadLetterPolicy.deadLetterTopic and deadLetterPolicy.maxDeliveryAttempts

One of the primary changes is consolidating the createSubscription call into its own function which applies the above policy (if available). Because we're potentially creating another topic, I removed the Optional return type from ensureTopicExists and instead have it call itself recursively to ensure we nearly always get a Topic object back and throw an exception in the extremely rare case where we first get an AlreadyExistsException but then still get null back from getTopic(). This is a slight change in behavior because were previously assuming everything was fine and just logging a warning, which we're no longer doing - like I said, though, this should be a pretty rare case.

The tests were refactored to remove the JUnit-4-only @Rule for exception catching, and use AssertJ instead.


The sample code is a combination of the simplicity of the legacy (annotation-based) style and the current functional style, but without the module split out of the other functional sample.

It ends up with 3 functions - one to publish the message, one to receive the message from the main topic, and one to receive the message from the dead letter topic. An integration test ensures this, like all the other tests, starts and runs the application passes as expected.

This PR is already pretty big, so I'll follow up with another PR to pull in some code samples into the refdoc to fix a TODO I put in there earlier.

@ttomsu ttomsu requested review from dzou, meltsufin and elefeint March 10, 2021 20:49
public class SinkExample {

@Bean
public Consumer<Message<UserMessage>> logUserMessage() {
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Should this also be a Consumer<UserMessage> like the below deadLetterMessages() for consistency? Or does one way only take the wrapped message object?

Copy link
Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Unfortunately no, because I need the ability to nack() the message. I've added a utility method that I'd like to get your opinion on for more reliably getting an object I can nack - though I'm not quite sure how this interacts with the acking/nacking happening within the framework. Maybe @elefeint would have some insight.

Copy link
Member

@meltsufin meltsufin left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Very cool! Just some minor comments.

DeadLetterPolicy.Builder dlpBuilder = DeadLetterPolicy.newBuilder().setDeadLetterTopic(dlTopic.getName());

Integer maxAttempts = deadLetterPolicy.getMaxDeliveryAttempts();
if (maxAttempts != null && maxAttempts > 0) {
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I think I would just let negative values for maxAttempts through. Let it fail there, or maybe there'll be special meaning for negative values.

This code sample demonstrates how to use the Spring Cloud Stream binder for Google Cloud Pub/Sub with Dead Letter Topics.

The sample app prompts a user for a message and user name.
That data is added to a `UserMessage` object, together with the time of message creation, and is sent through Google Cloud Pub/Sub to a sink that `nack()`s the message.
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Suggested change
That data is added to a `UserMessage` object, together with the time of message creation, and is sent through Google Cloud Pub/Sub to a sink that `nack()`s the message.
That data is added to a `UserMessage` object, together with the time of message creation, and is sent through Google Cloud Pub/Sub to a sink that `nack()`s the message, which eventually causes the message to be routed to the dead letter topic.

.orElseThrow(() -> new IllegalStateException("Could not find original PubSubMessage."));
Integer deliveryAttempt = Subscriber.getDeliveryAttempt(nackable.getPubsubMessage());

// Typically you wouldn't nack() every message, but this demonstrates the Pub/Sub system retrying delivery
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Doesn't stream automatically ack every message? Are you disabling that?

}

@Bean
Supplier<Flux<UserMessage>> generateUserMessages(Sinks.Many<UserMessage> postOffice) {
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Can we do the same without introducing reactive? Would it be simpler?

@ttomsu ttomsu requested a review from meltsufin March 11, 2021 19:24
+
```
Publishing message from batman
Nacking message 1 from batman at 2021-03-10T15:33:00.479420: to the batcave!
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

It's going to say (attempt 1) now.

spring.cloud.stream.gcp.pubsub.bindings.logUserMessage-in-0.consumer.deadLetterPolicy.maxDeliveryAttempts=6
# We set this to MANUAL ackMode to take control over the ack()/nack()-ing of messages for this demo.
# You do not need to do this in a normal application.
spring.cloud.stream.gcp.pubsub.bindings.logUserMessage-in-0.consumer.ackMode=MANUAL
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I'm surprised it worked without this setting for you.

@ttomsu
Copy link
Author

ttomsu commented Mar 15, 2021

I'm surprised it worked without this setting for you.

it worked because I nacked the message before the framework could ack it. I'm not sure if Pub/Sub threw any errors for attempting to ack and already nacked message, but it apparently redelivered it all the same.

@sonarqubecloud
Copy link

Kudos, SonarCloud Quality Gate passed!

Bug A 0 Bugs
Vulnerability A 0 Vulnerabilities
Security Hotspot A 0 Security Hotspots
Code Smell A 0 Code Smells

94.9% 94.9% Coverage
0.0% 0.0% Duplication

@ttomsu ttomsu merged commit 939df3e into main Mar 15, 2021
@ttomsu ttomsu deleted the cloud-stream-dlq branch March 15, 2021 15:18
kateryna216 added a commit to kateryna216/spring-cloud-gcp that referenced this pull request Oct 20, 2022
* Enable Dead Letter Queue on subscription created by Spring Cloud Stream
* Adds dead letter topic sample and integration test
prash-mi pushed a commit that referenced this pull request Jun 20, 2023
Bumps [checkstyle](https://github.com/checkstyle/checkstyle) from 8.41.1 to 8.42.
- [Release notes](https://github.com/checkstyle/checkstyle/releases)
- [Commits](checkstyle/checkstyle@checkstyle-8.41.1...checkstyle-8.42)

Signed-off-by: dependabot[bot] <[email protected]>

Co-authored-by: dependabot[bot] <49699333+dependabot[bot]@users.noreply.github.com>
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Labels
None yet
Projects
None yet
Development

Successfully merging this pull request may close these issues.

Add Support for Pub/Sub Dead Letter Queues
4 participants