From 6b74853d9b0f0c2fb80afe467e0d8a367557577f Mon Sep 17 00:00:00 2001 From: Travis Tomsu Date: Tue, 9 Mar 2021 14:39:08 -0500 Subject: [PATCH 1/7] Enable Dead Letter Queue on subscription created by Spring Cloud Stream --- .../properties/PubSubConsumerProperties.java | 32 +++++ .../PubSubChannelProvisioner.java | 46 +++++-- .../PubSubChannelProvisionerTests.java | 114 ++++++++++++------ .../cloud/spring/pubsub/PubSubAdmin.java | 3 +- 4 files changed, 147 insertions(+), 48 deletions(-) diff --git a/spring-cloud-gcp-pubsub-stream-binder/src/main/java/com/google/cloud/spring/stream/binder/pubsub/properties/PubSubConsumerProperties.java b/spring-cloud-gcp-pubsub-stream-binder/src/main/java/com/google/cloud/spring/stream/binder/pubsub/properties/PubSubConsumerProperties.java index 29dde202bd..76a2860255 100644 --- a/spring-cloud-gcp-pubsub-stream-binder/src/main/java/com/google/cloud/spring/stream/binder/pubsub/properties/PubSubConsumerProperties.java +++ b/spring-cloud-gcp-pubsub-stream-binder/src/main/java/com/google/cloud/spring/stream/binder/pubsub/properties/PubSubConsumerProperties.java @@ -34,6 +34,8 @@ public class PubSubConsumerProperties extends PubSubCommonProperties { private String subscriptionName = null; + private DeadLetterPolicy deadLetterPolicy = null; + public AckMode getAckMode() { return ackMode; } @@ -57,4 +59,34 @@ public String getSubscriptionName() { public void setSubscriptionName(String subscriptionName) { this.subscriptionName = subscriptionName; } + + public DeadLetterPolicy getDeadLetterPolicy() { + return deadLetterPolicy; + } + + public void setDeadLetterPolicy(DeadLetterPolicy deadLetterPolicy) { + this.deadLetterPolicy = deadLetterPolicy; + } + + public static class DeadLetterPolicy { + private String deadLetterTopic; + + private Integer maxDeliveryAttempts; + + public String getDeadLetterTopic() { + return deadLetterTopic; + } + + public void setDeadLetterTopic(String deadLetterTopic) { + this.deadLetterTopic = deadLetterTopic; + } + + public Integer getMaxDeliveryAttempts() { + return maxDeliveryAttempts; + } + + public void setMaxDeliveryAttempts(Integer maxDeliveryAttempts) { + this.maxDeliveryAttempts = maxDeliveryAttempts; + } + } } diff --git a/spring-cloud-gcp-pubsub-stream-binder/src/main/java/com/google/cloud/spring/stream/binder/pubsub/provisioning/PubSubChannelProvisioner.java b/spring-cloud-gcp-pubsub-stream-binder/src/main/java/com/google/cloud/spring/stream/binder/pubsub/provisioning/PubSubChannelProvisioner.java index da73294210..622f42b925 100644 --- a/spring-cloud-gcp-pubsub-stream-binder/src/main/java/com/google/cloud/spring/stream/binder/pubsub/provisioning/PubSubChannelProvisioner.java +++ b/spring-cloud-gcp-pubsub-stream-binder/src/main/java/com/google/cloud/spring/stream/binder/pubsub/provisioning/PubSubChannelProvisioner.java @@ -17,7 +17,6 @@ package com.google.cloud.spring.stream.binder.pubsub.provisioning; import java.util.HashSet; -import java.util.Optional; import java.util.Set; import java.util.UUID; @@ -25,6 +24,7 @@ import com.google.cloud.spring.pubsub.PubSubAdmin; import com.google.cloud.spring.stream.binder.pubsub.properties.PubSubConsumerProperties; import com.google.cloud.spring.stream.binder.pubsub.properties.PubSubProducerProperties; +import com.google.pubsub.v1.DeadLetterPolicy; import com.google.pubsub.v1.Subscription; import com.google.pubsub.v1.Topic; import com.google.pubsub.v1.TopicName; @@ -37,6 +37,7 @@ import org.springframework.cloud.stream.provisioning.ProducerDestination; import org.springframework.cloud.stream.provisioning.ProvisioningException; import org.springframework.cloud.stream.provisioning.ProvisioningProvider; +import org.springframework.util.Assert; import org.springframework.util.StringUtils; /** @@ -73,7 +74,10 @@ public ConsumerDestination provisionConsumerDestination(String topicName, String // topicName may be either the short or fully-qualified version. String topicShortName = TopicName.isParsableFrom(topicName) ? TopicName.parse(topicName).getTopic() : topicName; - Optional topic = ensureTopicExists(topicName, properties.getExtension().isAutoCreateResources()); + Topic topic = ensureTopicExists(topicName, properties.getExtension().isAutoCreateResources()); + + PubSubConsumerProperties.DeadLetterPolicy deadLetterPolicy = properties.getExtension().getDeadLetterPolicy(); + boolean autoCreate = properties.getExtension().isAutoCreateResources(); String subscriptionName; Subscription subscription; @@ -93,19 +97,19 @@ else if (StringUtils.hasText(group)) { else { // Generate anonymous random group since one wasn't provided subscriptionName = "anonymous." + topicShortName + "." + UUID.randomUUID().toString(); - subscription = this.pubSubAdmin.createSubscription(subscriptionName, topicName); + subscription = this.createSubscription(subscriptionName, topicName, deadLetterPolicy, autoCreate); this.anonymousGroupSubscriptionNames.add(subscriptionName); } if (subscription == null) { if (properties.getExtension().isAutoCreateResources()) { - this.pubSubAdmin.createSubscription(subscriptionName, topicName); + this.createSubscription(subscriptionName, topicName, deadLetterPolicy, autoCreate); } else { throw new ProvisioningException("Non-existing '" + subscriptionName + "' subscription."); } } - else if (topic.isPresent() && !subscription.getTopic().equals(topic.get().getName())) { + else if (!subscription.getTopic().equals(topic.getName())) { throw new ProvisioningException( "Existing '" + subscriptionName + "' subscription is for a different topic '" + subscription.getTopic() + "'."); @@ -124,7 +128,7 @@ public void afterUnbindConsumer(ConsumerDestination destination) { } } - private Optional ensureTopicExists(String topicName, boolean autoCreate) { + Topic ensureTopicExists(String topicName, boolean autoCreate) { Topic topic = this.pubSubAdmin.getTopic(topicName); if (topic == null) { if (autoCreate) { @@ -132,8 +136,7 @@ private Optional ensureTopicExists(String topicName, boolean autoCreate) topic = this.pubSubAdmin.createTopic(topicName); } catch (AlreadyExistsException alreadyExistsException) { - // Ignore concurrent topic creation - we're good as long as topic was created and exists - LOGGER.info("Failed to auto-create topic '" + topicName + "' because it already exists."); + return ensureTopicExists(topicName, false); } } else { @@ -141,6 +144,31 @@ private Optional ensureTopicExists(String topicName, boolean autoCreate) } } - return Optional.ofNullable(topic); + return topic; + } + + private Subscription createSubscription(String subscriptionName, String topicName, + PubSubConsumerProperties.DeadLetterPolicy deadLetterPolicy, + boolean autoCreate) { + Subscription.Builder builder = Subscription.newBuilder() + .setName(subscriptionName) + .setTopic(topicName); + + if (deadLetterPolicy != null) { + String dlTopicName = deadLetterPolicy.getDeadLetterTopic(); + Assert.hasText(dlTopicName, "Dead letter policy cannot have null or empty topic"); + + Topic dlTopic = ensureTopicExists(dlTopicName, autoCreate); + + DeadLetterPolicy.Builder dlpBuilder = DeadLetterPolicy.newBuilder().setDeadLetterTopic(dlTopic.getName()); + + Integer maxAttempts = deadLetterPolicy.getMaxDeliveryAttempts(); + if (maxAttempts != null && maxAttempts > 0) { + dlpBuilder.setMaxDeliveryAttempts(maxAttempts); + } + builder.setDeadLetterPolicy(dlpBuilder); + } + + return this.pubSubAdmin.createSubscription(builder); } } diff --git a/spring-cloud-gcp-pubsub-stream-binder/src/test/java/com/google/cloud/spring/stream/binder/pubsub/provisioning/PubSubChannelProvisionerTests.java b/spring-cloud-gcp-pubsub-stream-binder/src/test/java/com/google/cloud/spring/stream/binder/pubsub/provisioning/PubSubChannelProvisionerTests.java index 9394b6cfec..4f1ab78c48 100644 --- a/spring-cloud-gcp-pubsub-stream-binder/src/test/java/com/google/cloud/spring/stream/binder/pubsub/provisioning/PubSubChannelProvisionerTests.java +++ b/spring-cloud-gcp-pubsub-stream-binder/src/test/java/com/google/cloud/spring/stream/binder/pubsub/provisioning/PubSubChannelProvisionerTests.java @@ -18,14 +18,16 @@ import com.google.api.gax.rpc.AlreadyExistsException; import com.google.cloud.spring.pubsub.PubSubAdmin; +import com.google.cloud.spring.pubsub.support.PubSubSubscriptionUtils; +import com.google.cloud.spring.pubsub.support.PubSubTopicUtils; import com.google.cloud.spring.stream.binder.pubsub.properties.PubSubConsumerProperties; +import com.google.pubsub.v1.DeadLetterPolicy; import com.google.pubsub.v1.Subscription; import com.google.pubsub.v1.Topic; import org.junit.Before; -import org.junit.Rule; import org.junit.Test; -import org.junit.rules.ExpectedException; import org.junit.runner.RunWith; +import org.mockito.ArgumentCaptor; import org.mockito.Mock; import org.mockito.junit.MockitoJUnitRunner; @@ -33,9 +35,8 @@ import org.springframework.cloud.stream.provisioning.ProvisioningException; import static org.assertj.core.api.Assertions.assertThat; +import static org.assertj.core.api.Assertions.assertThatExceptionOfType; import static org.mockito.ArgumentMatchers.any; -import static org.mockito.ArgumentMatchers.eq; -import static org.mockito.ArgumentMatchers.matches; import static org.mockito.Mockito.doAnswer; import static org.mockito.Mockito.never; import static org.mockito.Mockito.times; @@ -64,23 +65,16 @@ public class PubSubChannelProvisionerTests { // class under test PubSubChannelProvisioner pubSubChannelProvisioner; - /** - * Used to check exception messages and types. - */ - @Rule - public ExpectedException expectedEx = ExpectedException.none(); - @Before public void setup() { when(this.pubSubAdminMock.getSubscription(any())).thenReturn(null); - doAnswer(invocation -> - Subscription.newBuilder() - .setName("projects/test-project/subscriptions/" + invocation.getArgument(0)) - .setTopic(invocation.getArgument(1, String.class).startsWith("projects/") ? - invocation.getArgument(1) : - "projects/test-project/topics/" + invocation.getArgument(1)) - .build() - ).when(this.pubSubAdminMock).createSubscription(any(), any()); + doAnswer(invocation -> { + Subscription.Builder arg = invocation.getArgument(0, Subscription.Builder.class); + return Subscription.newBuilder() + .setName(PubSubSubscriptionUtils.toProjectSubscriptionName(arg.getName(), "test-project").toString()) + .setTopic(PubSubTopicUtils.toTopicName(arg.getTopic(), "test-project").toString()) + .build(); + }).when(this.pubSubAdminMock).createSubscription(any()); doAnswer(invocation -> Topic.newBuilder().setName("projects/test-project/topics/" + invocation.getArgument(0)).build() ).when(this.pubSubAdminMock).getTopic(any()); @@ -97,7 +91,10 @@ public void testProvisionConsumerDestination_specifiedGroup() { assertThat(result.getName()).isEqualTo("topic_A.group_A"); - verify(this.pubSubAdminMock).createSubscription("topic_A.group_A", "topic_A"); + ArgumentCaptor argCaptor = ArgumentCaptor.forClass(Subscription.Builder.class); + verify(this.pubSubAdminMock).createSubscription(argCaptor.capture()); + assertThat(argCaptor.getValue().getName()).isEqualTo("topic_A.group_A"); + assertThat(argCaptor.getValue().getTopic()).isEqualTo("topic_A"); } @Test @@ -111,7 +108,10 @@ public void testProvisionConsumerDestination_specifiedGroupTopicInDifferentProje assertThat(result.getName()).isEqualTo("topic_A.group_A"); - verify(this.pubSubAdminMock).createSubscription("topic_A.group_A", "projects/differentProject/topics/topic_A"); + ArgumentCaptor argCaptor = ArgumentCaptor.forClass(Subscription.Builder.class); + verify(this.pubSubAdminMock).createSubscription(argCaptor.capture()); + assertThat(argCaptor.getValue().getName()).isEqualTo("topic_A.group_A"); + assertThat(argCaptor.getValue().getTopic()).isEqualTo("projects/differentProject/topics/topic_A"); } @Test @@ -127,37 +127,35 @@ public void testProvisionConsumerDestination_customSubscription() { @Test public void testProvisionConsumerDestination_noTopicException() { - this.expectedEx.expect(ProvisioningException.class); - this.expectedEx.expectMessage("Non-existing 'topic_A' topic."); - when(this.pubSubConsumerProperties.isAutoCreateResources()).thenReturn(false); when(this.pubSubAdminMock.getTopic("topic_A")).thenReturn(null); - PubSubConsumerDestination result = (PubSubConsumerDestination) this.pubSubChannelProvisioner - .provisionConsumerDestination("topic_A", "group_A", this.properties); + assertThatExceptionOfType(ProvisioningException.class) + .isThrownBy(() -> this.pubSubChannelProvisioner + .provisionConsumerDestination("topic_A", "group_A", this.properties)) + .withMessage("Non-existing 'topic_A' topic."); } @Test public void testProvisionConsumerDestination_noSubscriptionException() { - this.expectedEx.expect(ProvisioningException.class); - this.expectedEx.expectMessage("Non-existing 'topic_A.group_A' subscription."); - when(this.pubSubConsumerProperties.isAutoCreateResources()).thenReturn(false); - PubSubConsumerDestination result = (PubSubConsumerDestination) this.pubSubChannelProvisioner - .provisionConsumerDestination("topic_A", "group_A", this.properties); + assertThatExceptionOfType(ProvisioningException.class) + .isThrownBy(() -> this.pubSubChannelProvisioner + .provisionConsumerDestination("topic_A", "group_A", this.properties)) + .withMessage("Non-existing 'topic_A.group_A' subscription."); } @Test public void testProvisionConsumerDestination_wrongTopicException() { - this.expectedEx.expect(ProvisioningException.class); - this.expectedEx.expectMessage("Existing 'topic_A.group_A' subscription is for a different topic 'topic_B'."); - when(this.pubSubConsumerProperties.isAutoCreateResources()).thenReturn(false); - when(this.pubSubAdminMock.getSubscription("topic_A.group_A")).thenReturn(Subscription.newBuilder().setTopic("topic_B").build()); + when(this.pubSubAdminMock.getSubscription("topic_A.group_A")).thenReturn( + Subscription.newBuilder().setTopic("topic_B").build()); - PubSubConsumerDestination result = (PubSubConsumerDestination) this.pubSubChannelProvisioner - .provisionConsumerDestination("topic_A", "group_A", this.properties); + assertThatExceptionOfType(ProvisioningException.class) + .isThrownBy(() -> this.pubSubChannelProvisioner + .provisionConsumerDestination("topic_A", "group_A", this.properties)) + .withMessage("Existing 'topic_A.group_A' subscription is for a different topic 'topic_B'."); } @Test @@ -172,7 +170,35 @@ public void testProvisionConsumerDestination_anonymousGroup() { assertThat(result.getName()).matches(subscriptionNameRegex); - verify(this.pubSubAdminMock).createSubscription(matches(subscriptionNameRegex), eq("topic_A")); + ArgumentCaptor argCaptor = ArgumentCaptor.forClass(Subscription.Builder.class); + verify(this.pubSubAdminMock).createSubscription(argCaptor.capture()); + assertThat(argCaptor.getValue().getName()).matches(subscriptionNameRegex); + assertThat(argCaptor.getValue().getTopic()).isEqualTo("topic_A"); + } + + @Test + public void testProvisionConsumerDestination_deadLetterQueue() { + PubSubConsumerProperties.DeadLetterPolicy dlp = new PubSubConsumerProperties.DeadLetterPolicy(); + dlp.setDeadLetterTopic("deadLetterTopic"); + dlp.setMaxDeliveryAttempts(12); + when(this.pubSubConsumerProperties.getDeadLetterPolicy()).thenReturn(dlp); + + when(this.pubSubAdminMock.getTopic("deadLetterTopic")).thenReturn(null); + when(this.pubSubAdminMock.createTopic("deadLetterTopic")) + .thenReturn(Topic.newBuilder().setName("projects/test-project/topics/deadLetterTopic").build()); + + this.pubSubChannelProvisioner.provisionConsumerDestination("topic_A", "group_A", this.properties); + + ArgumentCaptor argCaptor = ArgumentCaptor.forClass(Subscription.Builder.class); + verify(this.pubSubAdminMock).createSubscription(argCaptor.capture()); + Subscription.Builder sb = argCaptor.getValue(); + assertThat(sb.getName()).isEqualTo("topic_A.group_A"); + assertThat(sb.getTopic()).isEqualTo("topic_A"); + assertThat(sb.getDeadLetterPolicy()).isNotNull(); + DeadLetterPolicy policy = sb.getDeadLetterPolicy(); + assertThat(policy.getDeadLetterTopic()).isEqualTo("projects/test-project/topics/deadLetterTopic"); + assertThat(policy.getMaxDeliveryAttempts()).isEqualTo(12); + } @Test @@ -209,10 +235,22 @@ public void testAfterUnbindConsumer_nonAnonymous() { @Test public void testProvisionConsumerDestination_concurrentTopicCreation() { when(this.pubSubAdminMock.createTopic(any())).thenThrow(AlreadyExistsException.class); - when(this.pubSubAdminMock.getTopic("already_existing_topic")).thenReturn(null); + when(this.pubSubAdminMock.getTopic("already_existing_topic")) + .thenReturn(null) + .thenReturn(Topic.newBuilder().setName("already_existing_topic").build()); // Ensure no exceptions occur if topic already exists on create call assertThat(this.pubSubChannelProvisioner .provisionConsumerDestination("already_existing_topic", "group1", this.properties)).isNotNull(); } + + @Test + public void testProvisionConsumerDestination_recursiveExistCalls() { + when(this.pubSubAdminMock.getTopic("new_topic")).thenReturn(null); + when(this.pubSubAdminMock.createTopic(any())).thenThrow(AlreadyExistsException.class); + + // Ensure no infinite loop on recursive call + assertThatExceptionOfType(ProvisioningException.class) + .isThrownBy(() -> this.pubSubChannelProvisioner.ensureTopicExists("new_topic", true)); + } } diff --git a/spring-cloud-gcp-pubsub/src/main/java/com/google/cloud/spring/pubsub/PubSubAdmin.java b/spring-cloud-gcp-pubsub/src/main/java/com/google/cloud/spring/pubsub/PubSubAdmin.java index e6b800a136..2b4646a54f 100644 --- a/spring-cloud-gcp-pubsub/src/main/java/com/google/cloud/spring/pubsub/PubSubAdmin.java +++ b/spring-cloud-gcp-pubsub/src/main/java/com/google/cloud/spring/pubsub/PubSubAdmin.java @@ -256,7 +256,8 @@ public Subscription createSubscription(String subscriptionName, String topicName * Create a new subscription on Google Cloud Pub/Sub. * * @param builder a Subscription.Builder straight from the client API library that exposes all available knobs and - * levers + * levers. The name and topic fields will be expanded to fully qualified names (i.e. + * "projects/my-project/topic/my-topic") if they are not already. * @return the created subscription */ public Subscription createSubscription(Subscription.Builder builder) { From 4d17f1c21e39c228242f327fe043a38d7579f60c Mon Sep 17 00:00:00 2001 From: Travis Tomsu Date: Wed, 10 Mar 2021 15:15:56 -0500 Subject: [PATCH 2/7] Adds dead letter topic sample and integration test --- spring-cloud-gcp-samples/pom.xml | 3 +- .../README.adoc | 38 ++++++++ .../pom.xml | 78 ++++++++++++++++ .../PubSubDeadLetterTopicApplication.java | 33 +++++++ .../main/java/com/example/SinkExample.java | 62 +++++++++++++ .../main/java/com/example/SourceExample.java | 68 ++++++++++++++ .../main/java/com/example/UserMessage.java | 36 +++++++ .../src/main/resources/application.properties | 20 ++++ .../src/main/resources/static/index.html | 15 +++ ...adLetterTopicSampleAppIntegrationTest.java | 93 +++++++++++++++++++ .../src/test/resources/logback-test.xml | 14 +++ 11 files changed, 459 insertions(+), 1 deletion(-) create mode 100644 spring-cloud-gcp-samples/spring-cloud-gcp-pubsub-stream-deadLetterTopic-sample/README.adoc create mode 100644 spring-cloud-gcp-samples/spring-cloud-gcp-pubsub-stream-deadLetterTopic-sample/pom.xml create mode 100644 spring-cloud-gcp-samples/spring-cloud-gcp-pubsub-stream-deadLetterTopic-sample/src/main/java/com/example/PubSubDeadLetterTopicApplication.java create mode 100644 spring-cloud-gcp-samples/spring-cloud-gcp-pubsub-stream-deadLetterTopic-sample/src/main/java/com/example/SinkExample.java create mode 100644 spring-cloud-gcp-samples/spring-cloud-gcp-pubsub-stream-deadLetterTopic-sample/src/main/java/com/example/SourceExample.java create mode 100644 spring-cloud-gcp-samples/spring-cloud-gcp-pubsub-stream-deadLetterTopic-sample/src/main/java/com/example/UserMessage.java create mode 100644 spring-cloud-gcp-samples/spring-cloud-gcp-pubsub-stream-deadLetterTopic-sample/src/main/resources/application.properties create mode 100644 spring-cloud-gcp-samples/spring-cloud-gcp-pubsub-stream-deadLetterTopic-sample/src/main/resources/static/index.html create mode 100644 spring-cloud-gcp-samples/spring-cloud-gcp-pubsub-stream-deadLetterTopic-sample/src/test/java/com/example/PubSubDeadLetterTopicSampleAppIntegrationTest.java create mode 100644 spring-cloud-gcp-samples/spring-cloud-gcp-pubsub-stream-deadLetterTopic-sample/src/test/resources/logback-test.xml diff --git a/spring-cloud-gcp-samples/pom.xml b/spring-cloud-gcp-samples/pom.xml index 9b6fa4b7f0..4afe58ecba 100644 --- a/spring-cloud-gcp-samples/pom.xml +++ b/spring-cloud-gcp-samples/pom.xml @@ -50,9 +50,10 @@ spring-cloud-gcp-vision-api-sample spring-cloud-gcp-integration-storage-sample spring-cloud-gcp-pubsub-bus-config-sample - spring-cloud-gcp-pubsub-stream-sample + spring-cloud-gcp-pubsub-stream-deadLetterTopic-sample spring-cloud-gcp-pubsub-stream-functional-sample spring-cloud-gcp-pubsub-stream-polling-sample + spring-cloud-gcp-pubsub-stream-sample spring-cloud-gcp-pubsub-reactive-sample spring-cloud-gcp-integration-pubsub-json-sample spring-cloud-gcp-security-iap-sample diff --git a/spring-cloud-gcp-samples/spring-cloud-gcp-pubsub-stream-deadLetterTopic-sample/README.adoc b/spring-cloud-gcp-samples/spring-cloud-gcp-pubsub-stream-deadLetterTopic-sample/README.adoc new file mode 100644 index 0000000000..644ba90b5f --- /dev/null +++ b/spring-cloud-gcp-samples/spring-cloud-gcp-pubsub-stream-deadLetterTopic-sample/README.adoc @@ -0,0 +1,38 @@ += Spring Cloud GCP Stream for Pub/Sub Code Sample + +This code sample demonstrates how to use the Spring Cloud Stream binder for Google Cloud Pub/Sub with Dead Letter Topics. + +The sample app prompts a user for a message and user name. +That data is added to a `UserMessage` object, together with the time of message creation, and is sent through Google Cloud Pub/Sub to a sink that `nack()`s the message. +Pub/Sub will attempt to redeliver the message up to maximum number of retries before routing it to the configured dead letter topic. +This sample app also listens to this dead letter topic, and logs any messages received. + +If the topics for the sink, source, and dead letter topic do not exist, the binder will automatically create them in Google Cloud Pub/Sub based on the values in link:src/main/resources/application.properties[application.properties]. + +== Running the code sample + +1. Configure your credentials and project ID by following link:../../docs/src/main/asciidoc/core.adoc#project-id[these instructions]. ++ +Alternatively, if you have the https://cloud.google.com/sdk/[Google Cloud SDK] installed and initialized, and are logged in with https://developers.google.com/identity/protocols/application-default-credentials[application default credentials], Spring will auto-discover those parameters for you. + +2. Set your project ID using the `spring.cloud.gcp.project-id` property in link:src/main/resources/application.properties[application.properties] or use the `gcloud config set project [PROJECT_ID]` Cloud SDK command. + +3. In the link:src/main/resources/application.properties[application.properties] file, a topic and a dead letter topic are already preconfigured for you. +The topics will be created in your account if they do not already exist. + +4. Run the `mvn clean spring-boot:run` in the root of the code sample to get the app running. + +5. Browse `localhost:8080`, type in a message, a user name, and press the `Post it!` button. + +6. Verify in your app's logs that a similar message was posted: ++ +``` +Publishing message from batman +Nacking message 1 from batman at 2021-03-10T15:33:00.479420: to the batcave! +Nacking message 2 from batman at 2021-03-10T15:33:00.479420: to the batcave! +Nacking message 3 from batman at 2021-03-10T15:33:00.479420: to the batcave! +Nacking message 4 from batman at 2021-03-10T15:33:00.479420: to the batcave! +Nacking message 5 from batman at 2021-03-10T15:33:00.479420: to the batcave! +Nacking message 6 from batman at 2021-03-10T15:33:00.479420: to the batcave! +Received message on dead letter topic from batman: to the batcave! +``` diff --git a/spring-cloud-gcp-samples/spring-cloud-gcp-pubsub-stream-deadLetterTopic-sample/pom.xml b/spring-cloud-gcp-samples/spring-cloud-gcp-pubsub-stream-deadLetterTopic-sample/pom.xml new file mode 100644 index 0000000000..265e584022 --- /dev/null +++ b/spring-cloud-gcp-samples/spring-cloud-gcp-pubsub-stream-deadLetterTopic-sample/pom.xml @@ -0,0 +1,78 @@ + + + + + spring-cloud-gcp-samples + com.google.cloud + 2.0.2-SNAPSHOT + + 4.0.0 + + spring-cloud-gcp-pubsub-stream-deadLetterTopic-sample + Spring Cloud GCP Code Sample - Pub/Sub Binder for Spring Cloud Stream with Dead Letter Topic + + + + + + com.google.cloud + spring-cloud-gcp-dependencies + ${project.version} + pom + import + + + + + + + com.google.cloud + spring-cloud-gcp-starter-pubsub + + + com.google.cloud + spring-cloud-gcp-pubsub-stream-binder + + + org.springframework.boot + spring-boot-starter-web + + + + + org.awaitility + awaitility + 3.1.6 + test + + + junit + junit + test + + + org.springframework.boot + spring-boot-starter-test + test + + + commons-io + commons-io + 2.5 + test + + + + + + + maven-deploy-plugin + + true + + + + + diff --git a/spring-cloud-gcp-samples/spring-cloud-gcp-pubsub-stream-deadLetterTopic-sample/src/main/java/com/example/PubSubDeadLetterTopicApplication.java b/spring-cloud-gcp-samples/spring-cloud-gcp-pubsub-stream-deadLetterTopic-sample/src/main/java/com/example/PubSubDeadLetterTopicApplication.java new file mode 100644 index 0000000000..d56f645f52 --- /dev/null +++ b/spring-cloud-gcp-samples/spring-cloud-gcp-pubsub-stream-deadLetterTopic-sample/src/main/java/com/example/PubSubDeadLetterTopicApplication.java @@ -0,0 +1,33 @@ +/* + * Copyright 2017-2018 the original author or authors. + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * https://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package com.example; + +import org.springframework.boot.SpringApplication; +import org.springframework.boot.autoconfigure.SpringBootApplication; + +/** + * Sample application for a binder. + * + * @author Travis Tomsu + */ +@SpringBootApplication +public class PubSubDeadLetterTopicApplication { + + public static void main(String[] args) { + SpringApplication.run(PubSubDeadLetterTopicApplication.class, args); + } +} diff --git a/spring-cloud-gcp-samples/spring-cloud-gcp-pubsub-stream-deadLetterTopic-sample/src/main/java/com/example/SinkExample.java b/spring-cloud-gcp-samples/spring-cloud-gcp-pubsub-stream-deadLetterTopic-sample/src/main/java/com/example/SinkExample.java new file mode 100644 index 0000000000..e129bf6a69 --- /dev/null +++ b/spring-cloud-gcp-samples/spring-cloud-gcp-pubsub-stream-deadLetterTopic-sample/src/main/java/com/example/SinkExample.java @@ -0,0 +1,62 @@ +/* + * Copyright 2017-2018 the original author or authors. + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * https://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package com.example; + +import java.util.function.Consumer; + +import com.google.cloud.pubsub.v1.Subscriber; +import com.google.cloud.spring.pubsub.support.BasicAcknowledgeablePubsubMessage; +import com.google.cloud.spring.pubsub.support.GcpPubSubHeaders; +import lombok.extern.slf4j.Slf4j; + +import org.springframework.context.annotation.Bean; +import org.springframework.context.annotation.Configuration; +import org.springframework.messaging.Message; +import org.springframework.util.Assert; + +/** + * Example of a sink for the sample app. + * + * @author Travis Tomsu + */ +@Configuration +@Slf4j +public class SinkExample { + + @Bean + public Consumer> logUserMessage() { + return message -> { + UserMessage userMessage = message.getPayload(); + BasicAcknowledgeablePubsubMessage nackable = + (BasicAcknowledgeablePubsubMessage) message.getHeaders().get(GcpPubSubHeaders.ORIGINAL_MESSAGE); + Assert.notNull(nackable, "Message was missing original message"); + Integer deliveryAttempt = Subscriber.getDeliveryAttempt(nackable.getPubsubMessage()); + + // Typically you won't nack() _every_ message, but this demonstrates a max number of retries before the + // message is routed to the dead letter queue. + log.info("Nacking message {} from {} at {}: {}", deliveryAttempt, userMessage.getUsername(), + userMessage.getCreatedAt(), userMessage.getBody()); + nackable.nack(); + }; + } + + @Bean + public Consumer deadLetterMessages() { + return userMessage -> log.info("Received message on dead letter topic from {}: {}", userMessage.getUsername(), + userMessage.getBody()); + } +} diff --git a/spring-cloud-gcp-samples/spring-cloud-gcp-pubsub-stream-deadLetterTopic-sample/src/main/java/com/example/SourceExample.java b/spring-cloud-gcp-samples/spring-cloud-gcp-pubsub-stream-deadLetterTopic-sample/src/main/java/com/example/SourceExample.java new file mode 100644 index 0000000000..9ba2eaecd9 --- /dev/null +++ b/spring-cloud-gcp-samples/spring-cloud-gcp-pubsub-stream-deadLetterTopic-sample/src/main/java/com/example/SourceExample.java @@ -0,0 +1,68 @@ +/* + * Copyright 2017-2018 the original author or authors. + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * https://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package com.example; + +import java.time.LocalDateTime; +import java.util.function.Supplier; + +import lombok.extern.slf4j.Slf4j; +import reactor.core.publisher.Flux; +import reactor.core.publisher.Sinks; + +import org.springframework.beans.factory.annotation.Autowired; +import org.springframework.context.annotation.Bean; +import org.springframework.context.annotation.Configuration; +import org.springframework.web.bind.annotation.PostMapping; +import org.springframework.web.bind.annotation.RequestParam; +import org.springframework.web.bind.annotation.RestController; + +/** + * An example source for the sample app. + * + * @author Travis Tomsu + */ +@RestController +@Slf4j +public class SourceExample { + + @Autowired + private Sinks.Many postOffice; + + @PostMapping("/newMessage") + public UserMessage sendMessage( + @RequestParam("messageBody") String messageBody, + @RequestParam("username") String username) { + UserMessage userMessage = new UserMessage(messageBody, username, LocalDateTime.now()); + log.info("Publishing message from {}", username); + this.postOffice.tryEmitNext(userMessage); + return userMessage; + } + + @Configuration + public static class SourceConfig { + + @Bean + public Sinks.Many postOffice() { + return Sinks.many().unicast().onBackpressureBuffer(); + } + + @Bean + Supplier> generateUserMessages(Sinks.Many postOffice) { + return postOffice::asFlux; + } + } +} diff --git a/spring-cloud-gcp-samples/spring-cloud-gcp-pubsub-stream-deadLetterTopic-sample/src/main/java/com/example/UserMessage.java b/spring-cloud-gcp-samples/spring-cloud-gcp-pubsub-stream-deadLetterTopic-sample/src/main/java/com/example/UserMessage.java new file mode 100644 index 0000000000..1fe67eaa63 --- /dev/null +++ b/spring-cloud-gcp-samples/spring-cloud-gcp-pubsub-stream-deadLetterTopic-sample/src/main/java/com/example/UserMessage.java @@ -0,0 +1,36 @@ +/* + * Copyright 2017-2018 the original author or authors. + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * https://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package com.example; + +import java.time.LocalDateTime; + +import lombok.Data; + +/** + * A user message for the sample app. + * + * @author Travis Tomsu + */ +@Data +public class UserMessage { + + private final String body; + + private final String username; + + private final LocalDateTime createdAt; +} diff --git a/spring-cloud-gcp-samples/spring-cloud-gcp-pubsub-stream-deadLetterTopic-sample/src/main/resources/application.properties b/spring-cloud-gcp-samples/spring-cloud-gcp-pubsub-stream-deadLetterTopic-sample/src/main/resources/application.properties new file mode 100644 index 0000000000..f096e723af --- /dev/null +++ b/spring-cloud-gcp-samples/spring-cloud-gcp-pubsub-stream-deadLetterTopic-sample/src/main/resources/application.properties @@ -0,0 +1,20 @@ +# We have multiple functional endpoints in our application, so we must explicitly list them. +spring.cloud.function.definition=generateUserMessages;logUserMessage;deadLetterMessages + +# The application's Source sends messages sent through the generateUserMessages method to this topic. +spring.cloud.stream.bindings.generateUserMessages-out-0.destination=my-main-topic + +# The applications' Sink receives messages throug the logUserMessage Consumer. The anonymous subscriber is configured to +# send any unprocessable messages to the dead letter topic after a set number of retry attempts. +spring.cloud.stream.bindings.logUserMessage-in-0.destination=my-main-topic +spring.cloud.stream.gcp.pubsub.bindings.logUserMessage-in-0.consumer.deadLetterPolicy.deadLetterTopic=my-dead-letter-topic +spring.cloud.stream.gcp.pubsub.bindings.logUserMessage-in-0.consumer.deadLetterPolicy.maxDeliveryAttempts=6 + +# The application also subscribes to the dead letter topic to show that it's just another normal topic. +spring.cloud.stream.bindings.deadLetterMessages-in-0.destination=my-dead-letter-topic + +spring.cloud.stream.gcp.pubsub.default.consumer.auto-create-resources=true + +#spring.cloud.gcp.project-id=[YOUR_GCP_PROJECT_ID] +#spring.cloud.gcp.credentials.location=file:[LOCAL_PATH_TO_CREDENTIALS] + diff --git a/spring-cloud-gcp-samples/spring-cloud-gcp-pubsub-stream-deadLetterTopic-sample/src/main/resources/static/index.html b/spring-cloud-gcp-samples/spring-cloud-gcp-pubsub-stream-deadLetterTopic-sample/src/main/resources/static/index.html new file mode 100644 index 0000000000..9dd6b615e4 --- /dev/null +++ b/spring-cloud-gcp-samples/spring-cloud-gcp-pubsub-stream-deadLetterTopic-sample/src/main/resources/static/index.html @@ -0,0 +1,15 @@ + + + + + Spring Cloud GCP Pub/Sub Stream Binder Code Sample + + + New message:
+
+ Message body:
+ Username:
+ +
+ + diff --git a/spring-cloud-gcp-samples/spring-cloud-gcp-pubsub-stream-deadLetterTopic-sample/src/test/java/com/example/PubSubDeadLetterTopicSampleAppIntegrationTest.java b/spring-cloud-gcp-samples/spring-cloud-gcp-pubsub-stream-deadLetterTopic-sample/src/test/java/com/example/PubSubDeadLetterTopicSampleAppIntegrationTest.java new file mode 100644 index 0000000000..7598a1a970 --- /dev/null +++ b/spring-cloud-gcp-samples/spring-cloud-gcp-pubsub-stream-deadLetterTopic-sample/src/test/java/com/example/PubSubDeadLetterTopicSampleAppIntegrationTest.java @@ -0,0 +1,93 @@ +/* + * Copyright 2017-2018 the original author or authors. + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * https://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package com.example; + +import java.io.ByteArrayOutputStream; +import java.io.PrintStream; +import java.util.UUID; +import java.util.concurrent.TimeUnit; + +import org.apache.commons.io.output.TeeOutputStream; +import org.junit.AfterClass; +import org.junit.BeforeClass; +import org.junit.Test; +import org.junit.runner.RunWith; + +import org.springframework.beans.factory.annotation.Autowired; +import org.springframework.boot.test.context.SpringBootTest; +import org.springframework.boot.test.web.client.TestRestTemplate; +import org.springframework.test.annotation.DirtiesContext; +import org.springframework.test.context.junit4.SpringRunner; +import org.springframework.util.LinkedMultiValueMap; +import org.springframework.util.MultiValueMap; + +import static org.assertj.core.api.Assertions.assertThat; +import static org.awaitility.Awaitility.await; +import static org.hamcrest.Matchers.is; +import static org.junit.Assume.assumeThat; + +/** + * @author Travis Tomsu + * @since 2.0.2 + */ +@RunWith(SpringRunner.class) +@SpringBootTest(webEnvironment = SpringBootTest.WebEnvironment.RANDOM_PORT) +@DirtiesContext +public class PubSubDeadLetterTopicSampleAppIntegrationTest { + + @Autowired + private TestRestTemplate restTemplate; + + private static PrintStream systemOut; + + private static ByteArrayOutputStream baos; + + @BeforeClass + public static void prepare() { + assumeThat("PUB/SUB-sample integration tests are disabled. Please use '-Dit.pubsub=true' to enable them.", + System.getProperty("it.pubsub"), is("true")); + + systemOut = System.out; + baos = new ByteArrayOutputStream(); + TeeOutputStream out = new TeeOutputStream(systemOut, baos); + System.setOut(new PrintStream(out)); + } + + @AfterClass + public static void bringBack() { + System.setOut(systemOut); + } + + @Test + public void testSample_deadLetterHandling() { + MultiValueMap map = new LinkedMultiValueMap<>(); + String message = "test message " + UUID.randomUUID(); + + map.add("messageBody", message); + map.add("username", "testUserName"); + + this.restTemplate.postForObject("/newMessage", map, String.class); + + await().atMost(60, TimeUnit.SECONDS) + .pollDelay(3, TimeUnit.SECONDS) + .untilAsserted(() -> assertThat(baos.toString()) + .contains("Nacking message 1 ") + .contains("Nacking message 6") + .contains("Received message on dead letter topic") + .contains(message)); + } +} diff --git a/spring-cloud-gcp-samples/spring-cloud-gcp-pubsub-stream-deadLetterTopic-sample/src/test/resources/logback-test.xml b/spring-cloud-gcp-samples/spring-cloud-gcp-pubsub-stream-deadLetterTopic-sample/src/test/resources/logback-test.xml new file mode 100644 index 0000000000..5535de3c96 --- /dev/null +++ b/spring-cloud-gcp-samples/spring-cloud-gcp-pubsub-stream-deadLetterTopic-sample/src/test/resources/logback-test.xml @@ -0,0 +1,14 @@ + + + + + + %d{HH:mm:ss.SSS} [%thread] %-5level %logger{36} - %msg%n + + + + + + + From 7d3213c9ee82f7261fa00096f452e4c4f76f915b Mon Sep 17 00:00:00 2001 From: Travis Tomsu Date: Thu, 11 Mar 2021 09:31:12 -0500 Subject: [PATCH 3/7] dzou comments --- .../PubSubChannelProvisioner.java | 22 +++-- .../inbound/PubSubInboundChannelAdapter.java | 1 - .../pubsub/support/GcpPubSubHeaders.java | 21 +++++ .../pubsub/support/GcpPubSubHeadersTest.java | 80 +++++++++++++++++++ .../main/java/com/example/SinkExample.java | 10 +-- 5 files changed, 115 insertions(+), 19 deletions(-) create mode 100644 spring-cloud-gcp-pubsub/src/test/java/com/google/cloud/spring/pubsub/support/GcpPubSubHeadersTest.java diff --git a/spring-cloud-gcp-pubsub-stream-binder/src/main/java/com/google/cloud/spring/stream/binder/pubsub/provisioning/PubSubChannelProvisioner.java b/spring-cloud-gcp-pubsub-stream-binder/src/main/java/com/google/cloud/spring/stream/binder/pubsub/provisioning/PubSubChannelProvisioner.java index 622f42b925..d801e9ce80 100644 --- a/spring-cloud-gcp-pubsub-stream-binder/src/main/java/com/google/cloud/spring/stream/binder/pubsub/provisioning/PubSubChannelProvisioner.java +++ b/spring-cloud-gcp-pubsub-stream-binder/src/main/java/com/google/cloud/spring/stream/binder/pubsub/provisioning/PubSubChannelProvisioner.java @@ -130,21 +130,19 @@ public void afterUnbindConsumer(ConsumerDestination destination) { Topic ensureTopicExists(String topicName, boolean autoCreate) { Topic topic = this.pubSubAdmin.getTopic(topicName); - if (topic == null) { - if (autoCreate) { - try { - topic = this.pubSubAdmin.createTopic(topicName); - } - catch (AlreadyExistsException alreadyExistsException) { - return ensureTopicExists(topicName, false); - } + if (topic != null) { + return topic; + } + + if (autoCreate) { + try { + return this.pubSubAdmin.createTopic(topicName); } - else { - throw new ProvisioningException("Non-existing '" + topicName + "' topic."); + catch (AlreadyExistsException alreadyExistsException) { + return ensureTopicExists(topicName, false); } } - - return topic; + throw new ProvisioningException("Non-existing '" + topicName + "' topic."); } private Subscription createSubscription(String subscriptionName, String topicName, diff --git a/spring-cloud-gcp-pubsub/src/main/java/com/google/cloud/spring/pubsub/integration/inbound/PubSubInboundChannelAdapter.java b/spring-cloud-gcp-pubsub/src/main/java/com/google/cloud/spring/pubsub/integration/inbound/PubSubInboundChannelAdapter.java index b98f810adb..0c02a09977 100644 --- a/spring-cloud-gcp-pubsub/src/main/java/com/google/cloud/spring/pubsub/integration/inbound/PubSubInboundChannelAdapter.java +++ b/spring-cloud-gcp-pubsub/src/main/java/com/google/cloud/spring/pubsub/integration/inbound/PubSubInboundChannelAdapter.java @@ -118,7 +118,6 @@ protected void doStop() { super.doStop(); } - @SuppressWarnings("deprecation") private void consumeMessage(ConvertedBasicAcknowledgeablePubsubMessage message) { Map messageHeaders = this.headerMapper.toHeaders(message.getPubsubMessage().getAttributesMap()); diff --git a/spring-cloud-gcp-pubsub/src/main/java/com/google/cloud/spring/pubsub/support/GcpPubSubHeaders.java b/spring-cloud-gcp-pubsub/src/main/java/com/google/cloud/spring/pubsub/support/GcpPubSubHeaders.java index 618da35b4e..81ac6977c1 100644 --- a/spring-cloud-gcp-pubsub/src/main/java/com/google/cloud/spring/pubsub/support/GcpPubSubHeaders.java +++ b/spring-cloud-gcp-pubsub/src/main/java/com/google/cloud/spring/pubsub/support/GcpPubSubHeaders.java @@ -16,6 +16,10 @@ package com.google.cloud.spring.pubsub.support; +import java.util.Optional; + +import org.springframework.messaging.Message; + /** * Google Cloud Platform internal headers for Spring Messaging messages. * @@ -25,6 +29,8 @@ */ public abstract class GcpPubSubHeaders { + private GcpPubSubHeaders() {} + private static final String PREFIX = "gcp_pubsub_"; /** @@ -36,4 +42,19 @@ public abstract class GcpPubSubHeaders { * The original message header text. */ public static final String ORIGINAL_MESSAGE = PREFIX + "original_message"; + + /** + * A simple utility method for pulling the {@link #ORIGINAL_MESSAGE} header out of a {@link Message}. + * + * @param message The Spring Message that was converted by a + * {@link com.google.cloud.spring.pubsub.integration.inbound.PubSubInboundChannelAdapter}. + * @return An Optional possibly containing a BasicAcknowledgeablePubsubMessage for acking and nacking. + */ + public static Optional getOriginalMessage(Message message) { + Object originalMessage = message.getHeaders().get(ORIGINAL_MESSAGE); + if (originalMessage instanceof BasicAcknowledgeablePubsubMessage){ + return Optional.of((BasicAcknowledgeablePubsubMessage) originalMessage); + } + return Optional.empty(); + } } diff --git a/spring-cloud-gcp-pubsub/src/test/java/com/google/cloud/spring/pubsub/support/GcpPubSubHeadersTest.java b/spring-cloud-gcp-pubsub/src/test/java/com/google/cloud/spring/pubsub/support/GcpPubSubHeadersTest.java new file mode 100644 index 0000000000..5bd4c86b00 --- /dev/null +++ b/spring-cloud-gcp-pubsub/src/test/java/com/google/cloud/spring/pubsub/support/GcpPubSubHeadersTest.java @@ -0,0 +1,80 @@ +/* + * Copyright $today.year-2021 the original author or authors. + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * https://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package com.google.cloud.spring.pubsub.support; + +import java.util.Collections; + +import com.google.pubsub.v1.ProjectSubscriptionName; +import com.google.pubsub.v1.PubsubMessage; +import org.junit.Test; + +import org.springframework.messaging.Message; +import org.springframework.messaging.support.GenericMessage; +import org.springframework.util.concurrent.ListenableFuture; + +import static org.assertj.core.api.Assertions.assertThat; + +public class GcpPubSubHeadersTest { + + @Test + public void getOriginalMessage_emptyHeaders() { + Message m = new GenericMessage<>("batman"); + assertThat(GcpPubSubHeaders.getOriginalMessage(m)).isEmpty(); + } + + @Test + public void getOriginalMessage_wrongType() { + Message m = new GenericMessage<>("batman", + Collections.singletonMap(GcpPubSubHeaders.ORIGINAL_MESSAGE, 101)); + assertThat(GcpPubSubHeaders.getOriginalMessage(m)).isEmpty(); + } + + @Test + public void getOriginalMessage() { + Message m = new GenericMessage<>("batman", + Collections.singletonMap(GcpPubSubHeaders.ORIGINAL_MESSAGE, + new TestBasicAcknowledgeablePubsubMessage())); + assertThat(GcpPubSubHeaders.getOriginalMessage(m)) + .isNotEmpty() + .get() + .isInstanceOf(BasicAcknowledgeablePubsubMessage.class); + } + + + private static class TestBasicAcknowledgeablePubsubMessage implements BasicAcknowledgeablePubsubMessage { + + @Override + public ProjectSubscriptionName getProjectSubscriptionName() { + return null; + } + + @Override + public PubsubMessage getPubsubMessage() { + return null; + } + + @Override + public ListenableFuture ack() { + return null; + } + + @Override + public ListenableFuture nack() { + return null; + } + } +} \ No newline at end of file diff --git a/spring-cloud-gcp-samples/spring-cloud-gcp-pubsub-stream-deadLetterTopic-sample/src/main/java/com/example/SinkExample.java b/spring-cloud-gcp-samples/spring-cloud-gcp-pubsub-stream-deadLetterTopic-sample/src/main/java/com/example/SinkExample.java index e129bf6a69..365890dc73 100644 --- a/spring-cloud-gcp-samples/spring-cloud-gcp-pubsub-stream-deadLetterTopic-sample/src/main/java/com/example/SinkExample.java +++ b/spring-cloud-gcp-samples/spring-cloud-gcp-pubsub-stream-deadLetterTopic-sample/src/main/java/com/example/SinkExample.java @@ -26,7 +26,6 @@ import org.springframework.context.annotation.Bean; import org.springframework.context.annotation.Configuration; import org.springframework.messaging.Message; -import org.springframework.util.Assert; /** * Example of a sink for the sample app. @@ -41,13 +40,12 @@ public class SinkExample { public Consumer> logUserMessage() { return message -> { UserMessage userMessage = message.getPayload(); - BasicAcknowledgeablePubsubMessage nackable = - (BasicAcknowledgeablePubsubMessage) message.getHeaders().get(GcpPubSubHeaders.ORIGINAL_MESSAGE); - Assert.notNull(nackable, "Message was missing original message"); + BasicAcknowledgeablePubsubMessage nackable = GcpPubSubHeaders.getOriginalMessage(message) + .orElseThrow(() -> new IllegalStateException("Could not find original PubSubMessage.")); Integer deliveryAttempt = Subscriber.getDeliveryAttempt(nackable.getPubsubMessage()); - // Typically you won't nack() _every_ message, but this demonstrates a max number of retries before the - // message is routed to the dead letter queue. + // Typically you wouldn't nack() every message, but this demonstrates the Pub/Sub system retrying delivery + // some number of times before the message is routed to the dead letter queue. log.info("Nacking message {} from {} at {}: {}", deliveryAttempt, userMessage.getUsername(), userMessage.getCreatedAt(), userMessage.getBody()); nackable.nack(); From 85600af1b23b6aac3b36c75ac4aacbcc7880648c Mon Sep 17 00:00:00 2001 From: Travis Tomsu Date: Thu, 11 Mar 2021 09:46:40 -0500 Subject: [PATCH 4/7] checkstyle --- .../google/cloud/spring/pubsub/support/GcpPubSubHeaders.java | 5 +++-- .../cloud/spring/pubsub/support/GcpPubSubHeadersTest.java | 4 ++-- 2 files changed, 5 insertions(+), 4 deletions(-) diff --git a/spring-cloud-gcp-pubsub/src/main/java/com/google/cloud/spring/pubsub/support/GcpPubSubHeaders.java b/spring-cloud-gcp-pubsub/src/main/java/com/google/cloud/spring/pubsub/support/GcpPubSubHeaders.java index 81ac6977c1..9540f6f24a 100644 --- a/spring-cloud-gcp-pubsub/src/main/java/com/google/cloud/spring/pubsub/support/GcpPubSubHeaders.java +++ b/spring-cloud-gcp-pubsub/src/main/java/com/google/cloud/spring/pubsub/support/GcpPubSubHeaders.java @@ -29,7 +29,8 @@ */ public abstract class GcpPubSubHeaders { - private GcpPubSubHeaders() {} + private GcpPubSubHeaders() { + } private static final String PREFIX = "gcp_pubsub_"; @@ -52,7 +53,7 @@ private GcpPubSubHeaders() {} */ public static Optional getOriginalMessage(Message message) { Object originalMessage = message.getHeaders().get(ORIGINAL_MESSAGE); - if (originalMessage instanceof BasicAcknowledgeablePubsubMessage){ + if (originalMessage instanceof BasicAcknowledgeablePubsubMessage) { return Optional.of((BasicAcknowledgeablePubsubMessage) originalMessage); } return Optional.empty(); diff --git a/spring-cloud-gcp-pubsub/src/test/java/com/google/cloud/spring/pubsub/support/GcpPubSubHeadersTest.java b/spring-cloud-gcp-pubsub/src/test/java/com/google/cloud/spring/pubsub/support/GcpPubSubHeadersTest.java index 5bd4c86b00..11c57a396d 100644 --- a/spring-cloud-gcp-pubsub/src/test/java/com/google/cloud/spring/pubsub/support/GcpPubSubHeadersTest.java +++ b/spring-cloud-gcp-pubsub/src/test/java/com/google/cloud/spring/pubsub/support/GcpPubSubHeadersTest.java @@ -1,5 +1,5 @@ /* - * Copyright $today.year-2021 the original author or authors. + * Copyright 2021-2021 the original author or authors. * * Licensed under the Apache License, Version 2.0 (the "License"); * you may not use this file except in compliance with the License. @@ -77,4 +77,4 @@ public ListenableFuture nack() { return null; } } -} \ No newline at end of file +} From 73538a97b7d0b47ab41b0ac4c3c4b53df9d4a372 Mon Sep 17 00:00:00 2001 From: Travis Tomsu Date: Thu, 11 Mar 2021 14:15:25 -0500 Subject: [PATCH 5/7] mike and elena comments --- .../PubSubChannelProvisioner.java | 21 +++++++------ .../pubsub/support/GcpPubSubHeadersTest.java | 30 ++----------------- spring-cloud-gcp-samples/pom.xml | 2 +- .../README.adoc | 2 +- .../pom.xml | 2 +- .../PubSubDeadLetterTopicApplication.java | 0 .../main/java/com/example/SinkExample.java | 2 +- .../main/java/com/example/SourceExample.java | 0 .../main/java/com/example/UserMessage.java | 0 .../src/main/resources/application.properties | 0 .../src/main/resources/static/index.html | 0 ...adLetterTopicSampleAppIntegrationTest.java | 4 +-- .../src/test/resources/logback-test.xml | 0 13 files changed, 20 insertions(+), 43 deletions(-) rename spring-cloud-gcp-samples/{spring-cloud-gcp-pubsub-stream-deadLetterTopic-sample => spring-cloud-gcp-pubsub-stream-dead-letter-sample}/README.adoc (97%) rename spring-cloud-gcp-samples/{spring-cloud-gcp-pubsub-stream-deadLetterTopic-sample => spring-cloud-gcp-pubsub-stream-dead-letter-sample}/pom.xml (97%) rename spring-cloud-gcp-samples/{spring-cloud-gcp-pubsub-stream-deadLetterTopic-sample => spring-cloud-gcp-pubsub-stream-dead-letter-sample}/src/main/java/com/example/PubSubDeadLetterTopicApplication.java (100%) rename spring-cloud-gcp-samples/{spring-cloud-gcp-pubsub-stream-deadLetterTopic-sample => spring-cloud-gcp-pubsub-stream-dead-letter-sample}/src/main/java/com/example/SinkExample.java (95%) rename spring-cloud-gcp-samples/{spring-cloud-gcp-pubsub-stream-deadLetterTopic-sample => spring-cloud-gcp-pubsub-stream-dead-letter-sample}/src/main/java/com/example/SourceExample.java (100%) rename spring-cloud-gcp-samples/{spring-cloud-gcp-pubsub-stream-deadLetterTopic-sample => spring-cloud-gcp-pubsub-stream-dead-letter-sample}/src/main/java/com/example/UserMessage.java (100%) rename spring-cloud-gcp-samples/{spring-cloud-gcp-pubsub-stream-deadLetterTopic-sample => spring-cloud-gcp-pubsub-stream-dead-letter-sample}/src/main/resources/application.properties (100%) rename spring-cloud-gcp-samples/{spring-cloud-gcp-pubsub-stream-deadLetterTopic-sample => spring-cloud-gcp-pubsub-stream-dead-letter-sample}/src/main/resources/static/index.html (100%) rename spring-cloud-gcp-samples/{spring-cloud-gcp-pubsub-stream-deadLetterTopic-sample => spring-cloud-gcp-pubsub-stream-dead-letter-sample}/src/test/java/com/example/PubSubDeadLetterTopicSampleAppIntegrationTest.java (96%) rename spring-cloud-gcp-samples/{spring-cloud-gcp-pubsub-stream-deadLetterTopic-sample => spring-cloud-gcp-pubsub-stream-dead-letter-sample}/src/test/resources/logback-test.xml (100%) diff --git a/spring-cloud-gcp-pubsub-stream-binder/src/main/java/com/google/cloud/spring/stream/binder/pubsub/provisioning/PubSubChannelProvisioner.java b/spring-cloud-gcp-pubsub-stream-binder/src/main/java/com/google/cloud/spring/stream/binder/pubsub/provisioning/PubSubChannelProvisioner.java index d801e9ce80..a93b2b71ad 100644 --- a/spring-cloud-gcp-pubsub-stream-binder/src/main/java/com/google/cloud/spring/stream/binder/pubsub/provisioning/PubSubChannelProvisioner.java +++ b/spring-cloud-gcp-pubsub-stream-binder/src/main/java/com/google/cloud/spring/stream/binder/pubsub/provisioning/PubSubChannelProvisioner.java @@ -72,16 +72,17 @@ public ProducerDestination provisionProducerDestination(String topic, public ConsumerDestination provisionConsumerDestination(String topicName, String group, ExtendedConsumerProperties properties) { - // topicName may be either the short or fully-qualified version. - String topicShortName = TopicName.isParsableFrom(topicName) ? TopicName.parse(topicName).getTopic() : topicName; - Topic topic = ensureTopicExists(topicName, properties.getExtension().isAutoCreateResources()); - - PubSubConsumerProperties.DeadLetterPolicy deadLetterPolicy = properties.getExtension().getDeadLetterPolicy(); - boolean autoCreate = properties.getExtension().isAutoCreateResources(); - String subscriptionName; Subscription subscription; + String customName = properties.getExtension().getSubscriptionName(); + boolean autoCreate = properties.getExtension().isAutoCreateResources(); + PubSubConsumerProperties.DeadLetterPolicy deadLetterPolicy = properties.getExtension().getDeadLetterPolicy(); + + // topicName may be either the short or fully-qualified version. + String topicShortName = TopicName.isParsableFrom(topicName) ? TopicName.parse(topicName).getTopic() : topicName; + Topic topic = ensureTopicExists(topicName, autoCreate); + if (StringUtils.hasText(customName)) { if (StringUtils.hasText(group)) { LOGGER.warn("Either subscriptionName or group can be specified, but not both. " + @@ -102,7 +103,7 @@ else if (StringUtils.hasText(group)) { } if (subscription == null) { - if (properties.getExtension().isAutoCreateResources()) { + if (autoCreate) { this.createSubscription(subscriptionName, topicName, deadLetterPolicy, autoCreate); } else { @@ -139,6 +140,8 @@ Topic ensureTopicExists(String topicName, boolean autoCreate) { return this.pubSubAdmin.createTopic(topicName); } catch (AlreadyExistsException alreadyExistsException) { + // Sometimes 2+ instances of this application will race to create the topic, so this ensures we retry + // in the non-winning instances. In the rare case it fails, we throw an exception. return ensureTopicExists(topicName, false); } } @@ -161,7 +164,7 @@ private Subscription createSubscription(String subscriptionName, String topicNam DeadLetterPolicy.Builder dlpBuilder = DeadLetterPolicy.newBuilder().setDeadLetterTopic(dlTopic.getName()); Integer maxAttempts = deadLetterPolicy.getMaxDeliveryAttempts(); - if (maxAttempts != null && maxAttempts > 0) { + if (maxAttempts != null) { dlpBuilder.setMaxDeliveryAttempts(maxAttempts); } builder.setDeadLetterPolicy(dlpBuilder); diff --git a/spring-cloud-gcp-pubsub/src/test/java/com/google/cloud/spring/pubsub/support/GcpPubSubHeadersTest.java b/spring-cloud-gcp-pubsub/src/test/java/com/google/cloud/spring/pubsub/support/GcpPubSubHeadersTest.java index 11c57a396d..00e80c397d 100644 --- a/spring-cloud-gcp-pubsub/src/test/java/com/google/cloud/spring/pubsub/support/GcpPubSubHeadersTest.java +++ b/spring-cloud-gcp-pubsub/src/test/java/com/google/cloud/spring/pubsub/support/GcpPubSubHeadersTest.java @@ -18,15 +18,13 @@ import java.util.Collections; -import com.google.pubsub.v1.ProjectSubscriptionName; -import com.google.pubsub.v1.PubsubMessage; import org.junit.Test; import org.springframework.messaging.Message; import org.springframework.messaging.support.GenericMessage; -import org.springframework.util.concurrent.ListenableFuture; import static org.assertj.core.api.Assertions.assertThat; +import static org.mockito.Mockito.mock; public class GcpPubSubHeadersTest { @@ -47,34 +45,10 @@ public void getOriginalMessage_wrongType() { public void getOriginalMessage() { Message m = new GenericMessage<>("batman", Collections.singletonMap(GcpPubSubHeaders.ORIGINAL_MESSAGE, - new TestBasicAcknowledgeablePubsubMessage())); + mock(BasicAcknowledgeablePubsubMessage.class))); assertThat(GcpPubSubHeaders.getOriginalMessage(m)) .isNotEmpty() .get() .isInstanceOf(BasicAcknowledgeablePubsubMessage.class); } - - - private static class TestBasicAcknowledgeablePubsubMessage implements BasicAcknowledgeablePubsubMessage { - - @Override - public ProjectSubscriptionName getProjectSubscriptionName() { - return null; - } - - @Override - public PubsubMessage getPubsubMessage() { - return null; - } - - @Override - public ListenableFuture ack() { - return null; - } - - @Override - public ListenableFuture nack() { - return null; - } - } } diff --git a/spring-cloud-gcp-samples/pom.xml b/spring-cloud-gcp-samples/pom.xml index 4afe58ecba..c1dcc3e179 100644 --- a/spring-cloud-gcp-samples/pom.xml +++ b/spring-cloud-gcp-samples/pom.xml @@ -50,7 +50,7 @@ spring-cloud-gcp-vision-api-sample spring-cloud-gcp-integration-storage-sample spring-cloud-gcp-pubsub-bus-config-sample - spring-cloud-gcp-pubsub-stream-deadLetterTopic-sample + spring-cloud-gcp-pubsub-stream-dead-letter-sample spring-cloud-gcp-pubsub-stream-functional-sample spring-cloud-gcp-pubsub-stream-polling-sample spring-cloud-gcp-pubsub-stream-sample diff --git a/spring-cloud-gcp-samples/spring-cloud-gcp-pubsub-stream-deadLetterTopic-sample/README.adoc b/spring-cloud-gcp-samples/spring-cloud-gcp-pubsub-stream-dead-letter-sample/README.adoc similarity index 97% rename from spring-cloud-gcp-samples/spring-cloud-gcp-pubsub-stream-deadLetterTopic-sample/README.adoc rename to spring-cloud-gcp-samples/spring-cloud-gcp-pubsub-stream-dead-letter-sample/README.adoc index 644ba90b5f..ac6ef74076 100644 --- a/spring-cloud-gcp-samples/spring-cloud-gcp-pubsub-stream-deadLetterTopic-sample/README.adoc +++ b/spring-cloud-gcp-samples/spring-cloud-gcp-pubsub-stream-dead-letter-sample/README.adoc @@ -1,4 +1,4 @@ -= Spring Cloud GCP Stream for Pub/Sub Code Sample += Spring Cloud GCP Stream for Pub/Sub Dead Letter Topic Code Sample This code sample demonstrates how to use the Spring Cloud Stream binder for Google Cloud Pub/Sub with Dead Letter Topics. diff --git a/spring-cloud-gcp-samples/spring-cloud-gcp-pubsub-stream-deadLetterTopic-sample/pom.xml b/spring-cloud-gcp-samples/spring-cloud-gcp-pubsub-stream-dead-letter-sample/pom.xml similarity index 97% rename from spring-cloud-gcp-samples/spring-cloud-gcp-pubsub-stream-deadLetterTopic-sample/pom.xml rename to spring-cloud-gcp-samples/spring-cloud-gcp-pubsub-stream-dead-letter-sample/pom.xml index 265e584022..b8ca231397 100644 --- a/spring-cloud-gcp-samples/spring-cloud-gcp-pubsub-stream-deadLetterTopic-sample/pom.xml +++ b/spring-cloud-gcp-samples/spring-cloud-gcp-pubsub-stream-dead-letter-sample/pom.xml @@ -10,7 +10,7 @@ 4.0.0 - spring-cloud-gcp-pubsub-stream-deadLetterTopic-sample + spring-cloud-gcp-pubsub-stream-dead-letter-sample Spring Cloud GCP Code Sample - Pub/Sub Binder for Spring Cloud Stream with Dead Letter Topic diff --git a/spring-cloud-gcp-samples/spring-cloud-gcp-pubsub-stream-deadLetterTopic-sample/src/main/java/com/example/PubSubDeadLetterTopicApplication.java b/spring-cloud-gcp-samples/spring-cloud-gcp-pubsub-stream-dead-letter-sample/src/main/java/com/example/PubSubDeadLetterTopicApplication.java similarity index 100% rename from spring-cloud-gcp-samples/spring-cloud-gcp-pubsub-stream-deadLetterTopic-sample/src/main/java/com/example/PubSubDeadLetterTopicApplication.java rename to spring-cloud-gcp-samples/spring-cloud-gcp-pubsub-stream-dead-letter-sample/src/main/java/com/example/PubSubDeadLetterTopicApplication.java diff --git a/spring-cloud-gcp-samples/spring-cloud-gcp-pubsub-stream-deadLetterTopic-sample/src/main/java/com/example/SinkExample.java b/spring-cloud-gcp-samples/spring-cloud-gcp-pubsub-stream-dead-letter-sample/src/main/java/com/example/SinkExample.java similarity index 95% rename from spring-cloud-gcp-samples/spring-cloud-gcp-pubsub-stream-deadLetterTopic-sample/src/main/java/com/example/SinkExample.java rename to spring-cloud-gcp-samples/spring-cloud-gcp-pubsub-stream-dead-letter-sample/src/main/java/com/example/SinkExample.java index 365890dc73..4f4434aaf7 100644 --- a/spring-cloud-gcp-samples/spring-cloud-gcp-pubsub-stream-deadLetterTopic-sample/src/main/java/com/example/SinkExample.java +++ b/spring-cloud-gcp-samples/spring-cloud-gcp-pubsub-stream-dead-letter-sample/src/main/java/com/example/SinkExample.java @@ -46,7 +46,7 @@ public Consumer> logUserMessage() { // Typically you wouldn't nack() every message, but this demonstrates the Pub/Sub system retrying delivery // some number of times before the message is routed to the dead letter queue. - log.info("Nacking message {} from {} at {}: {}", deliveryAttempt, userMessage.getUsername(), + log.info("Nacking message (attempt {}) from {} at {}: {}", deliveryAttempt, userMessage.getUsername(), userMessage.getCreatedAt(), userMessage.getBody()); nackable.nack(); }; diff --git a/spring-cloud-gcp-samples/spring-cloud-gcp-pubsub-stream-deadLetterTopic-sample/src/main/java/com/example/SourceExample.java b/spring-cloud-gcp-samples/spring-cloud-gcp-pubsub-stream-dead-letter-sample/src/main/java/com/example/SourceExample.java similarity index 100% rename from spring-cloud-gcp-samples/spring-cloud-gcp-pubsub-stream-deadLetterTopic-sample/src/main/java/com/example/SourceExample.java rename to spring-cloud-gcp-samples/spring-cloud-gcp-pubsub-stream-dead-letter-sample/src/main/java/com/example/SourceExample.java diff --git a/spring-cloud-gcp-samples/spring-cloud-gcp-pubsub-stream-deadLetterTopic-sample/src/main/java/com/example/UserMessage.java b/spring-cloud-gcp-samples/spring-cloud-gcp-pubsub-stream-dead-letter-sample/src/main/java/com/example/UserMessage.java similarity index 100% rename from spring-cloud-gcp-samples/spring-cloud-gcp-pubsub-stream-deadLetterTopic-sample/src/main/java/com/example/UserMessage.java rename to spring-cloud-gcp-samples/spring-cloud-gcp-pubsub-stream-dead-letter-sample/src/main/java/com/example/UserMessage.java diff --git a/spring-cloud-gcp-samples/spring-cloud-gcp-pubsub-stream-deadLetterTopic-sample/src/main/resources/application.properties b/spring-cloud-gcp-samples/spring-cloud-gcp-pubsub-stream-dead-letter-sample/src/main/resources/application.properties similarity index 100% rename from spring-cloud-gcp-samples/spring-cloud-gcp-pubsub-stream-deadLetterTopic-sample/src/main/resources/application.properties rename to spring-cloud-gcp-samples/spring-cloud-gcp-pubsub-stream-dead-letter-sample/src/main/resources/application.properties diff --git a/spring-cloud-gcp-samples/spring-cloud-gcp-pubsub-stream-deadLetterTopic-sample/src/main/resources/static/index.html b/spring-cloud-gcp-samples/spring-cloud-gcp-pubsub-stream-dead-letter-sample/src/main/resources/static/index.html similarity index 100% rename from spring-cloud-gcp-samples/spring-cloud-gcp-pubsub-stream-deadLetterTopic-sample/src/main/resources/static/index.html rename to spring-cloud-gcp-samples/spring-cloud-gcp-pubsub-stream-dead-letter-sample/src/main/resources/static/index.html diff --git a/spring-cloud-gcp-samples/spring-cloud-gcp-pubsub-stream-deadLetterTopic-sample/src/test/java/com/example/PubSubDeadLetterTopicSampleAppIntegrationTest.java b/spring-cloud-gcp-samples/spring-cloud-gcp-pubsub-stream-dead-letter-sample/src/test/java/com/example/PubSubDeadLetterTopicSampleAppIntegrationTest.java similarity index 96% rename from spring-cloud-gcp-samples/spring-cloud-gcp-pubsub-stream-deadLetterTopic-sample/src/test/java/com/example/PubSubDeadLetterTopicSampleAppIntegrationTest.java rename to spring-cloud-gcp-samples/spring-cloud-gcp-pubsub-stream-dead-letter-sample/src/test/java/com/example/PubSubDeadLetterTopicSampleAppIntegrationTest.java index 7598a1a970..b1fba25600 100644 --- a/spring-cloud-gcp-samples/spring-cloud-gcp-pubsub-stream-deadLetterTopic-sample/src/test/java/com/example/PubSubDeadLetterTopicSampleAppIntegrationTest.java +++ b/spring-cloud-gcp-samples/spring-cloud-gcp-pubsub-stream-dead-letter-sample/src/test/java/com/example/PubSubDeadLetterTopicSampleAppIntegrationTest.java @@ -85,8 +85,8 @@ public void testSample_deadLetterHandling() { await().atMost(60, TimeUnit.SECONDS) .pollDelay(3, TimeUnit.SECONDS) .untilAsserted(() -> assertThat(baos.toString()) - .contains("Nacking message 1 ") - .contains("Nacking message 6") + .contains("Nacking message (attempt 1)") + .contains("Nacking message (attempt 6)") .contains("Received message on dead letter topic") .contains(message)); } diff --git a/spring-cloud-gcp-samples/spring-cloud-gcp-pubsub-stream-deadLetterTopic-sample/src/test/resources/logback-test.xml b/spring-cloud-gcp-samples/spring-cloud-gcp-pubsub-stream-dead-letter-sample/src/test/resources/logback-test.xml similarity index 100% rename from spring-cloud-gcp-samples/spring-cloud-gcp-pubsub-stream-deadLetterTopic-sample/src/test/resources/logback-test.xml rename to spring-cloud-gcp-samples/spring-cloud-gcp-pubsub-stream-dead-letter-sample/src/test/resources/logback-test.xml From de6e32356b8e582a432834937169bf680b4f80b4 Mon Sep 17 00:00:00 2001 From: Travis Tomsu Date: Thu, 11 Mar 2021 14:22:56 -0500 Subject: [PATCH 6/7] ackMode=MANUAL in sample --- .../src/main/resources/application.properties | 3 +++ 1 file changed, 3 insertions(+) diff --git a/spring-cloud-gcp-samples/spring-cloud-gcp-pubsub-stream-dead-letter-sample/src/main/resources/application.properties b/spring-cloud-gcp-samples/spring-cloud-gcp-pubsub-stream-dead-letter-sample/src/main/resources/application.properties index f096e723af..c613071ac4 100644 --- a/spring-cloud-gcp-samples/spring-cloud-gcp-pubsub-stream-dead-letter-sample/src/main/resources/application.properties +++ b/spring-cloud-gcp-samples/spring-cloud-gcp-pubsub-stream-dead-letter-sample/src/main/resources/application.properties @@ -9,6 +9,9 @@ spring.cloud.stream.bindings.generateUserMessages-out-0.destination=my-main-topi spring.cloud.stream.bindings.logUserMessage-in-0.destination=my-main-topic spring.cloud.stream.gcp.pubsub.bindings.logUserMessage-in-0.consumer.deadLetterPolicy.deadLetterTopic=my-dead-letter-topic spring.cloud.stream.gcp.pubsub.bindings.logUserMessage-in-0.consumer.deadLetterPolicy.maxDeliveryAttempts=6 +# We set this to MANUAL ackMode to take control over the ack()/nack()-ing of messages for this demo. +# You do not need to do this in a normal application. +spring.cloud.stream.gcp.pubsub.bindings.logUserMessage-in-0.consumer.ackMode=MANUAL # The application also subscribes to the dead letter topic to show that it's just another normal topic. spring.cloud.stream.bindings.deadLetterMessages-in-0.destination=my-dead-letter-topic From b10d008075233a3c25a90a29766bff1b1e4d2e74 Mon Sep 17 00:00:00 2001 From: Travis Tomsu Date: Mon, 15 Mar 2021 11:06:40 -0400 Subject: [PATCH 7/7] update output in docs --- .../README.adoc | 14 +++++++------- 1 file changed, 7 insertions(+), 7 deletions(-) diff --git a/spring-cloud-gcp-samples/spring-cloud-gcp-pubsub-stream-dead-letter-sample/README.adoc b/spring-cloud-gcp-samples/spring-cloud-gcp-pubsub-stream-dead-letter-sample/README.adoc index ac6ef74076..53652d9f83 100644 --- a/spring-cloud-gcp-samples/spring-cloud-gcp-pubsub-stream-dead-letter-sample/README.adoc +++ b/spring-cloud-gcp-samples/spring-cloud-gcp-pubsub-stream-dead-letter-sample/README.adoc @@ -4,7 +4,7 @@ This code sample demonstrates how to use the Spring Cloud Stream binder for Goog The sample app prompts a user for a message and user name. That data is added to a `UserMessage` object, together with the time of message creation, and is sent through Google Cloud Pub/Sub to a sink that `nack()`s the message. -Pub/Sub will attempt to redeliver the message up to maximum number of retries before routing it to the configured dead letter topic. +Pub/Sub will attempt to redeliver the message up to maximum number of retries before routing it to the configured dead letter topic.\ This sample app also listens to this dead letter topic, and logs any messages received. If the topics for the sink, source, and dead letter topic do not exist, the binder will automatically create them in Google Cloud Pub/Sub based on the values in link:src/main/resources/application.properties[application.properties]. @@ -28,11 +28,11 @@ The topics will be created in your account if they do not already exist. + ``` Publishing message from batman -Nacking message 1 from batman at 2021-03-10T15:33:00.479420: to the batcave! -Nacking message 2 from batman at 2021-03-10T15:33:00.479420: to the batcave! -Nacking message 3 from batman at 2021-03-10T15:33:00.479420: to the batcave! -Nacking message 4 from batman at 2021-03-10T15:33:00.479420: to the batcave! -Nacking message 5 from batman at 2021-03-10T15:33:00.479420: to the batcave! -Nacking message 6 from batman at 2021-03-10T15:33:00.479420: to the batcave! +Nacking message (attempt 1) from batman at 2021-03-10T15:33:00.479420: to the batcave! +Nacking message (attempt 2) from batman at 2021-03-10T15:33:00.479420: to the batcave! +Nacking message (attempt 3) from batman at 2021-03-10T15:33:00.479420: to the batcave! +Nacking message (attempt 4) from batman at 2021-03-10T15:33:00.479420: to the batcave! +Nacking message (attempt 5) from batman at 2021-03-10T15:33:00.479420: to the batcave! +Nacking message (attempt 6) from batman at 2021-03-10T15:33:00.479420: to the batcave! Received message on dead letter topic from batman: to the batcave! ```