Skip to content

Cloud Stream Dead Letter Topic support #358

New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Merged
merged 7 commits into from
Mar 15, 2021
Merged
Show file tree
Hide file tree
Changes from 2 commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -34,6 +34,8 @@ public class PubSubConsumerProperties extends PubSubCommonProperties {

private String subscriptionName = null;

private DeadLetterPolicy deadLetterPolicy = null;

public AckMode getAckMode() {
return ackMode;
}
Expand All @@ -57,4 +59,34 @@ public String getSubscriptionName() {
public void setSubscriptionName(String subscriptionName) {
this.subscriptionName = subscriptionName;
}

public DeadLetterPolicy getDeadLetterPolicy() {
return deadLetterPolicy;
}

public void setDeadLetterPolicy(DeadLetterPolicy deadLetterPolicy) {
this.deadLetterPolicy = deadLetterPolicy;
}

public static class DeadLetterPolicy {
private String deadLetterTopic;

private Integer maxDeliveryAttempts;

public String getDeadLetterTopic() {
return deadLetterTopic;
}

public void setDeadLetterTopic(String deadLetterTopic) {
this.deadLetterTopic = deadLetterTopic;
}

public Integer getMaxDeliveryAttempts() {
return maxDeliveryAttempts;
}

public void setMaxDeliveryAttempts(Integer maxDeliveryAttempts) {
this.maxDeliveryAttempts = maxDeliveryAttempts;
}
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -17,14 +17,14 @@
package com.google.cloud.spring.stream.binder.pubsub.provisioning;

import java.util.HashSet;
import java.util.Optional;
import java.util.Set;
import java.util.UUID;

import com.google.api.gax.rpc.AlreadyExistsException;
import com.google.cloud.spring.pubsub.PubSubAdmin;
import com.google.cloud.spring.stream.binder.pubsub.properties.PubSubConsumerProperties;
import com.google.cloud.spring.stream.binder.pubsub.properties.PubSubProducerProperties;
import com.google.pubsub.v1.DeadLetterPolicy;
import com.google.pubsub.v1.Subscription;
import com.google.pubsub.v1.Topic;
import com.google.pubsub.v1.TopicName;
Expand All @@ -37,6 +37,7 @@
import org.springframework.cloud.stream.provisioning.ProducerDestination;
import org.springframework.cloud.stream.provisioning.ProvisioningException;
import org.springframework.cloud.stream.provisioning.ProvisioningProvider;
import org.springframework.util.Assert;
import org.springframework.util.StringUtils;

/**
Expand Down Expand Up @@ -73,7 +74,10 @@ public ConsumerDestination provisionConsumerDestination(String topicName, String

// topicName may be either the short or fully-qualified version.
String topicShortName = TopicName.isParsableFrom(topicName) ? TopicName.parse(topicName).getTopic() : topicName;
Optional<Topic> topic = ensureTopicExists(topicName, properties.getExtension().isAutoCreateResources());
Topic topic = ensureTopicExists(topicName, properties.getExtension().isAutoCreateResources());

PubSubConsumerProperties.DeadLetterPolicy deadLetterPolicy = properties.getExtension().getDeadLetterPolicy();
boolean autoCreate = properties.getExtension().isAutoCreateResources();

String subscriptionName;
Subscription subscription;
Expand All @@ -93,19 +97,19 @@ else if (StringUtils.hasText(group)) {
else {
// Generate anonymous random group since one wasn't provided
subscriptionName = "anonymous." + topicShortName + "." + UUID.randomUUID().toString();
subscription = this.pubSubAdmin.createSubscription(subscriptionName, topicName);
subscription = this.createSubscription(subscriptionName, topicName, deadLetterPolicy, autoCreate);
this.anonymousGroupSubscriptionNames.add(subscriptionName);
}

if (subscription == null) {
if (properties.getExtension().isAutoCreateResources()) {
this.pubSubAdmin.createSubscription(subscriptionName, topicName);
this.createSubscription(subscriptionName, topicName, deadLetterPolicy, autoCreate);
}
else {
throw new ProvisioningException("Non-existing '" + subscriptionName + "' subscription.");
}
}
else if (topic.isPresent() && !subscription.getTopic().equals(topic.get().getName())) {
else if (!subscription.getTopic().equals(topic.getName())) {
throw new ProvisioningException(
"Existing '" + subscriptionName + "' subscription is for a different topic '"
+ subscription.getTopic() + "'.");
Expand All @@ -124,23 +128,47 @@ public void afterUnbindConsumer(ConsumerDestination destination) {
}
}

private Optional<Topic> ensureTopicExists(String topicName, boolean autoCreate) {
Topic ensureTopicExists(String topicName, boolean autoCreate) {
Topic topic = this.pubSubAdmin.getTopic(topicName);
if (topic == null) {
if (autoCreate) {
try {
topic = this.pubSubAdmin.createTopic(topicName);
}
catch (AlreadyExistsException alreadyExistsException) {
// Ignore concurrent topic creation - we're good as long as topic was created and exists
LOGGER.info("Failed to auto-create topic '" + topicName + "' because it already exists.");
return ensureTopicExists(topicName, false);
}
}
else {
throw new ProvisioningException("Non-existing '" + topicName + "' topic.");
}
}

return Optional.ofNullable(topic);
return topic;
}

private Subscription createSubscription(String subscriptionName, String topicName,
PubSubConsumerProperties.DeadLetterPolicy deadLetterPolicy,
boolean autoCreate) {
Subscription.Builder builder = Subscription.newBuilder()
.setName(subscriptionName)
.setTopic(topicName);

if (deadLetterPolicy != null) {
String dlTopicName = deadLetterPolicy.getDeadLetterTopic();
Assert.hasText(dlTopicName, "Dead letter policy cannot have null or empty topic");

Topic dlTopic = ensureTopicExists(dlTopicName, autoCreate);

DeadLetterPolicy.Builder dlpBuilder = DeadLetterPolicy.newBuilder().setDeadLetterTopic(dlTopic.getName());

Integer maxAttempts = deadLetterPolicy.getMaxDeliveryAttempts();
if (maxAttempts != null && maxAttempts > 0) {
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I think I would just let negative values for maxAttempts through. Let it fail there, or maybe there'll be special meaning for negative values.

dlpBuilder.setMaxDeliveryAttempts(maxAttempts);
}
builder.setDeadLetterPolicy(dlpBuilder);
}

return this.pubSubAdmin.createSubscription(builder);
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -18,24 +18,25 @@

import com.google.api.gax.rpc.AlreadyExistsException;
import com.google.cloud.spring.pubsub.PubSubAdmin;
import com.google.cloud.spring.pubsub.support.PubSubSubscriptionUtils;
import com.google.cloud.spring.pubsub.support.PubSubTopicUtils;
import com.google.cloud.spring.stream.binder.pubsub.properties.PubSubConsumerProperties;
import com.google.pubsub.v1.DeadLetterPolicy;
import com.google.pubsub.v1.Subscription;
import com.google.pubsub.v1.Topic;
import org.junit.Before;
import org.junit.Rule;
import org.junit.Test;
import org.junit.rules.ExpectedException;
import org.junit.runner.RunWith;
import org.mockito.ArgumentCaptor;
import org.mockito.Mock;
import org.mockito.junit.MockitoJUnitRunner;

import org.springframework.cloud.stream.binder.ExtendedConsumerProperties;
import org.springframework.cloud.stream.provisioning.ProvisioningException;

import static org.assertj.core.api.Assertions.assertThat;
import static org.assertj.core.api.Assertions.assertThatExceptionOfType;
import static org.mockito.ArgumentMatchers.any;
import static org.mockito.ArgumentMatchers.eq;
import static org.mockito.ArgumentMatchers.matches;
import static org.mockito.Mockito.doAnswer;
import static org.mockito.Mockito.never;
import static org.mockito.Mockito.times;
Expand Down Expand Up @@ -64,23 +65,16 @@ public class PubSubChannelProvisionerTests {
// class under test
PubSubChannelProvisioner pubSubChannelProvisioner;

/**
* Used to check exception messages and types.
*/
@Rule
public ExpectedException expectedEx = ExpectedException.none();

@Before
public void setup() {
when(this.pubSubAdminMock.getSubscription(any())).thenReturn(null);
doAnswer(invocation ->
Subscription.newBuilder()
.setName("projects/test-project/subscriptions/" + invocation.getArgument(0))
.setTopic(invocation.getArgument(1, String.class).startsWith("projects/") ?
invocation.getArgument(1) :
"projects/test-project/topics/" + invocation.getArgument(1))
.build()
).when(this.pubSubAdminMock).createSubscription(any(), any());
doAnswer(invocation -> {
Subscription.Builder arg = invocation.getArgument(0, Subscription.Builder.class);
return Subscription.newBuilder()
.setName(PubSubSubscriptionUtils.toProjectSubscriptionName(arg.getName(), "test-project").toString())
.setTopic(PubSubTopicUtils.toTopicName(arg.getTopic(), "test-project").toString())
.build();
}).when(this.pubSubAdminMock).createSubscription(any());
doAnswer(invocation ->
Topic.newBuilder().setName("projects/test-project/topics/" + invocation.getArgument(0)).build()
).when(this.pubSubAdminMock).getTopic(any());
Expand All @@ -97,7 +91,10 @@ public void testProvisionConsumerDestination_specifiedGroup() {

assertThat(result.getName()).isEqualTo("topic_A.group_A");

verify(this.pubSubAdminMock).createSubscription("topic_A.group_A", "topic_A");
ArgumentCaptor<Subscription.Builder> argCaptor = ArgumentCaptor.forClass(Subscription.Builder.class);
verify(this.pubSubAdminMock).createSubscription(argCaptor.capture());
assertThat(argCaptor.getValue().getName()).isEqualTo("topic_A.group_A");
assertThat(argCaptor.getValue().getTopic()).isEqualTo("topic_A");
}

@Test
Expand All @@ -111,7 +108,10 @@ public void testProvisionConsumerDestination_specifiedGroupTopicInDifferentProje

assertThat(result.getName()).isEqualTo("topic_A.group_A");

verify(this.pubSubAdminMock).createSubscription("topic_A.group_A", "projects/differentProject/topics/topic_A");
ArgumentCaptor<Subscription.Builder> argCaptor = ArgumentCaptor.forClass(Subscription.Builder.class);
verify(this.pubSubAdminMock).createSubscription(argCaptor.capture());
assertThat(argCaptor.getValue().getName()).isEqualTo("topic_A.group_A");
assertThat(argCaptor.getValue().getTopic()).isEqualTo("projects/differentProject/topics/topic_A");
}

@Test
Expand All @@ -127,37 +127,35 @@ public void testProvisionConsumerDestination_customSubscription() {

@Test
public void testProvisionConsumerDestination_noTopicException() {
this.expectedEx.expect(ProvisioningException.class);
this.expectedEx.expectMessage("Non-existing 'topic_A' topic.");

when(this.pubSubConsumerProperties.isAutoCreateResources()).thenReturn(false);
when(this.pubSubAdminMock.getTopic("topic_A")).thenReturn(null);

PubSubConsumerDestination result = (PubSubConsumerDestination) this.pubSubChannelProvisioner
.provisionConsumerDestination("topic_A", "group_A", this.properties);
assertThatExceptionOfType(ProvisioningException.class)
.isThrownBy(() -> this.pubSubChannelProvisioner
.provisionConsumerDestination("topic_A", "group_A", this.properties))
.withMessage("Non-existing 'topic_A' topic.");
}

@Test
public void testProvisionConsumerDestination_noSubscriptionException() {
this.expectedEx.expect(ProvisioningException.class);
this.expectedEx.expectMessage("Non-existing 'topic_A.group_A' subscription.");

when(this.pubSubConsumerProperties.isAutoCreateResources()).thenReturn(false);

PubSubConsumerDestination result = (PubSubConsumerDestination) this.pubSubChannelProvisioner
.provisionConsumerDestination("topic_A", "group_A", this.properties);
assertThatExceptionOfType(ProvisioningException.class)
.isThrownBy(() -> this.pubSubChannelProvisioner
.provisionConsumerDestination("topic_A", "group_A", this.properties))
.withMessage("Non-existing 'topic_A.group_A' subscription.");
}

@Test
public void testProvisionConsumerDestination_wrongTopicException() {
this.expectedEx.expect(ProvisioningException.class);
this.expectedEx.expectMessage("Existing 'topic_A.group_A' subscription is for a different topic 'topic_B'.");

when(this.pubSubConsumerProperties.isAutoCreateResources()).thenReturn(false);
when(this.pubSubAdminMock.getSubscription("topic_A.group_A")).thenReturn(Subscription.newBuilder().setTopic("topic_B").build());
when(this.pubSubAdminMock.getSubscription("topic_A.group_A")).thenReturn(
Subscription.newBuilder().setTopic("topic_B").build());

PubSubConsumerDestination result = (PubSubConsumerDestination) this.pubSubChannelProvisioner
.provisionConsumerDestination("topic_A", "group_A", this.properties);
assertThatExceptionOfType(ProvisioningException.class)
.isThrownBy(() -> this.pubSubChannelProvisioner
.provisionConsumerDestination("topic_A", "group_A", this.properties))
.withMessage("Existing 'topic_A.group_A' subscription is for a different topic 'topic_B'.");
}

@Test
Expand All @@ -172,7 +170,35 @@ public void testProvisionConsumerDestination_anonymousGroup() {

assertThat(result.getName()).matches(subscriptionNameRegex);

verify(this.pubSubAdminMock).createSubscription(matches(subscriptionNameRegex), eq("topic_A"));
ArgumentCaptor<Subscription.Builder> argCaptor = ArgumentCaptor.forClass(Subscription.Builder.class);
verify(this.pubSubAdminMock).createSubscription(argCaptor.capture());
assertThat(argCaptor.getValue().getName()).matches(subscriptionNameRegex);
assertThat(argCaptor.getValue().getTopic()).isEqualTo("topic_A");
}

@Test
public void testProvisionConsumerDestination_deadLetterQueue() {
PubSubConsumerProperties.DeadLetterPolicy dlp = new PubSubConsumerProperties.DeadLetterPolicy();
dlp.setDeadLetterTopic("deadLetterTopic");
dlp.setMaxDeliveryAttempts(12);
when(this.pubSubConsumerProperties.getDeadLetterPolicy()).thenReturn(dlp);

when(this.pubSubAdminMock.getTopic("deadLetterTopic")).thenReturn(null);
when(this.pubSubAdminMock.createTopic("deadLetterTopic"))
.thenReturn(Topic.newBuilder().setName("projects/test-project/topics/deadLetterTopic").build());

this.pubSubChannelProvisioner.provisionConsumerDestination("topic_A", "group_A", this.properties);

ArgumentCaptor<Subscription.Builder> argCaptor = ArgumentCaptor.forClass(Subscription.Builder.class);
verify(this.pubSubAdminMock).createSubscription(argCaptor.capture());
Subscription.Builder sb = argCaptor.getValue();
assertThat(sb.getName()).isEqualTo("topic_A.group_A");
assertThat(sb.getTopic()).isEqualTo("topic_A");
assertThat(sb.getDeadLetterPolicy()).isNotNull();
DeadLetterPolicy policy = sb.getDeadLetterPolicy();
assertThat(policy.getDeadLetterTopic()).isEqualTo("projects/test-project/topics/deadLetterTopic");
assertThat(policy.getMaxDeliveryAttempts()).isEqualTo(12);

}

@Test
Expand Down Expand Up @@ -209,10 +235,22 @@ public void testAfterUnbindConsumer_nonAnonymous() {
@Test
public void testProvisionConsumerDestination_concurrentTopicCreation() {
when(this.pubSubAdminMock.createTopic(any())).thenThrow(AlreadyExistsException.class);
when(this.pubSubAdminMock.getTopic("already_existing_topic")).thenReturn(null);
when(this.pubSubAdminMock.getTopic("already_existing_topic"))
.thenReturn(null)
.thenReturn(Topic.newBuilder().setName("already_existing_topic").build());

// Ensure no exceptions occur if topic already exists on create call
assertThat(this.pubSubChannelProvisioner
.provisionConsumerDestination("already_existing_topic", "group1", this.properties)).isNotNull();
}

@Test
public void testProvisionConsumerDestination_recursiveExistCalls() {
when(this.pubSubAdminMock.getTopic("new_topic")).thenReturn(null);
when(this.pubSubAdminMock.createTopic(any())).thenThrow(AlreadyExistsException.class);

// Ensure no infinite loop on recursive call
assertThatExceptionOfType(ProvisioningException.class)
.isThrownBy(() -> this.pubSubChannelProvisioner.ensureTopicExists("new_topic", true));
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -256,7 +256,8 @@ public Subscription createSubscription(String subscriptionName, String topicName
* Create a new subscription on Google Cloud Pub/Sub.
*
* @param builder a Subscription.Builder straight from the client API library that exposes all available knobs and
* levers
* levers. The name and topic fields will be expanded to fully qualified names (i.e.
* "projects/my-project/topic/my-topic") if they are not already.
* @return the created subscription
*/
public Subscription createSubscription(Subscription.Builder builder) {
Expand Down
3 changes: 2 additions & 1 deletion spring-cloud-gcp-samples/pom.xml
Original file line number Diff line number Diff line change
Expand Up @@ -50,9 +50,10 @@
<module>spring-cloud-gcp-vision-api-sample</module>
<module>spring-cloud-gcp-integration-storage-sample</module>
<module>spring-cloud-gcp-pubsub-bus-config-sample</module>
<module>spring-cloud-gcp-pubsub-stream-sample</module>
<module>spring-cloud-gcp-pubsub-stream-deadLetterTopic-sample</module>
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I think that camel case is going against the naming convention for our modules.
spring-cloud-gcp-pubsub-stream-dead-letter-topic-sample?

<module>spring-cloud-gcp-pubsub-stream-functional-sample</module>
<module>spring-cloud-gcp-pubsub-stream-polling-sample</module>
<module>spring-cloud-gcp-pubsub-stream-sample</module>
<module>spring-cloud-gcp-pubsub-reactive-sample</module>
<module>spring-cloud-gcp-integration-pubsub-json-sample</module>
<module>spring-cloud-gcp-security-iap-sample</module>
Expand Down
Loading