-
Notifications
You must be signed in to change notification settings - Fork 336
Cloud Stream Dead Letter Topic support #358
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
Conversation
...n/java/com/google/cloud/spring/stream/binder/pubsub/properties/PubSubConsumerProperties.java
Show resolved
Hide resolved
...java/com/google/cloud/spring/stream/binder/pubsub/provisioning/PubSubChannelProvisioner.java
Show resolved
Hide resolved
public class SinkExample { | ||
|
||
@Bean | ||
public Consumer<Message<UserMessage>> logUserMessage() { |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Should this also be a Consumer<UserMessage>
like the below deadLetterMessages()
for consistency? Or does one way only take the wrapped message object?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Unfortunately no, because I need the ability to nack()
the message. I've added a utility method that I'd like to get your opinion on for more reliably getting an object I can nack - though I'm not quite sure how this interacts with the acking/nacking happening within the framework. Maybe @elefeint would have some insight.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Very cool! Just some minor comments.
...n/java/com/google/cloud/spring/stream/binder/pubsub/properties/PubSubConsumerProperties.java
Show resolved
Hide resolved
...java/com/google/cloud/spring/stream/binder/pubsub/provisioning/PubSubChannelProvisioner.java
Show resolved
Hide resolved
...java/com/google/cloud/spring/stream/binder/pubsub/provisioning/PubSubChannelProvisioner.java
Outdated
Show resolved
Hide resolved
...java/com/google/cloud/spring/stream/binder/pubsub/provisioning/PubSubChannelProvisioner.java
Outdated
Show resolved
Hide resolved
DeadLetterPolicy.Builder dlpBuilder = DeadLetterPolicy.newBuilder().setDeadLetterTopic(dlTopic.getName()); | ||
|
||
Integer maxAttempts = deadLetterPolicy.getMaxDeliveryAttempts(); | ||
if (maxAttempts != null && maxAttempts > 0) { |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I think I would just let negative values for maxAttempts
through. Let it fail there, or maybe there'll be special meaning for negative values.
spring-cloud-gcp-samples/spring-cloud-gcp-pubsub-stream-deadLetterTopic-sample/README.adoc
Outdated
Show resolved
Hide resolved
This code sample demonstrates how to use the Spring Cloud Stream binder for Google Cloud Pub/Sub with Dead Letter Topics. | ||
|
||
The sample app prompts a user for a message and user name. | ||
That data is added to a `UserMessage` object, together with the time of message creation, and is sent through Google Cloud Pub/Sub to a sink that `nack()`s the message. |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
That data is added to a `UserMessage` object, together with the time of message creation, and is sent through Google Cloud Pub/Sub to a sink that `nack()`s the message. | |
That data is added to a `UserMessage` object, together with the time of message creation, and is sent through Google Cloud Pub/Sub to a sink that `nack()`s the message, which eventually causes the message to be routed to the dead letter topic. |
...ng-cloud-gcp-pubsub-stream-deadLetterTopic-sample/src/main/java/com/example/SinkExample.java
Outdated
Show resolved
Hide resolved
.orElseThrow(() -> new IllegalStateException("Could not find original PubSubMessage.")); | ||
Integer deliveryAttempt = Subscriber.getDeliveryAttempt(nackable.getPubsubMessage()); | ||
|
||
// Typically you wouldn't nack() every message, but this demonstrates the Pub/Sub system retrying delivery |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Doesn't stream automatically ack every message? Are you disabling that?
} | ||
|
||
@Bean | ||
Supplier<Flux<UserMessage>> generateUserMessages(Sinks.Many<UserMessage> postOffice) { |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Can we do the same without introducing reactive? Would it be simpler?
...java/com/google/cloud/spring/stream/binder/pubsub/provisioning/PubSubChannelProvisioner.java
Outdated
Show resolved
Hide resolved
...java/com/google/cloud/spring/stream/binder/pubsub/provisioning/PubSubChannelProvisioner.java
Show resolved
Hide resolved
+ | ||
``` | ||
Publishing message from batman | ||
Nacking message 1 from batman at 2021-03-10T15:33:00.479420: to the batcave! |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
It's going to say (attempt 1)
now.
spring.cloud.stream.gcp.pubsub.bindings.logUserMessage-in-0.consumer.deadLetterPolicy.maxDeliveryAttempts=6 | ||
# We set this to MANUAL ackMode to take control over the ack()/nack()-ing of messages for this demo. | ||
# You do not need to do this in a normal application. | ||
spring.cloud.stream.gcp.pubsub.bindings.logUserMessage-in-0.consumer.ackMode=MANUAL |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I'm surprised it worked without this setting for you.
it worked because I |
Kudos, SonarCloud Quality Gate passed! |
* Enable Dead Letter Queue on subscription created by Spring Cloud Stream * Adds dead letter topic sample and integration test
Bumps [checkstyle](https://github.com/checkstyle/checkstyle) from 8.41.1 to 8.42. - [Release notes](https://github.com/checkstyle/checkstyle/releases) - [Commits](checkstyle/checkstyle@checkstyle-8.41.1...checkstyle-8.42) Signed-off-by: dependabot[bot] <[email protected]> Co-authored-by: dependabot[bot] <49699333+dependabot[bot]@users.noreply.github.com>
Fixes #158
Two commits here - one for the production code and one for the sample.
Since Cloud Stream handles all things related to topic/subscription creation, we need a way to support dead letter topics. This PR expands the
PubSubConsumerProperties
to allow settingdeadLetterPolicy.deadLetterTopic
anddeadLetterPolicy.maxDeliveryAttempts
One of the primary changes is consolidating the
createSubscription
call into its own function which applies the above policy (if available). Because we're potentially creating another topic, I removed theOptional
return type fromensureTopicExists
and instead have it call itself recursively to ensure we nearly always get aTopic
object back and throw an exception in the extremely rare case where we first get anAlreadyExistsException
but then still getnull
back fromgetTopic()
. This is a slight change in behavior because were previously assuming everything was fine and just logging a warning, which we're no longer doing - like I said, though, this should be a pretty rare case.The tests were refactored to remove the JUnit-4-only
@Rule
for exception catching, and use AssertJ instead.The sample code is a combination of the simplicity of the legacy (annotation-based) style and the current functional style, but without the module split out of the other functional sample.
It ends up with 3 functions - one to publish the message, one to receive the message from the main topic, and one to receive the message from the dead letter topic. An integration test ensures this, like all the other tests, starts and runs the application passes as expected.
This PR is already pretty big, so I'll follow up with another PR to pull in some code samples into the refdoc to fix a TODO I put in there earlier.