Skip to content

Enable Pub/Sub message ordering for publishing #408

New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Merged
merged 4 commits into from
Mar 25, 2021
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
12 changes: 12 additions & 0 deletions docs/src/main/asciidoc/pubsub.adoc
Original file line number Diff line number Diff line change
Expand Up @@ -77,6 +77,8 @@ The delay threshold to use for batching.
After this amount of time has elapsed (counting from the first element added), the elements will be wrapped up in a batch and sent. | No | 1 ms (batching off)
| `spring.cloud.gcp.pubsub.publisher.batching.enabled`|
Enables batching. | No | false
| `spring.cloud.gcp.pubsub.publisher.enable-message-ordering`|
Enables message ordering. | No | false
|===

==== GRPC Connection Settings
Expand Down Expand Up @@ -176,6 +178,16 @@ include::{project-root}/spring-cloud-gcp-autoconfigure/src/test/java/com/google/

By default, the `SimplePubSubMessageConverter` is used to convert payloads of type `byte[]`, `ByteString`, `ByteBuffer`, and `String` to Pub/Sub messages.

===== Ordering messages

If you are relying on message converters and would like to provide an ordering key, use the `GcpPubSubHeaders.ORDERING_KEY` header.
You will also need to make sure to enable message ordering on the publisher via the `spring.cloud.gcp.pubsub.publisher.enable-message-ordering` property.

[source,java,indent=0]
----
include::{project-root}/spring-cloud-gcp-autoconfigure/src/test/java/com/google/cloud/spring/autoconfigure/pubsub/it/PubSubTemplateDocumentationIntegrationTests.java[tag=publish_ordering]
----

==== Subscribing to a subscription

Google Cloud Pub/Sub allows many subscriptions to be associated to the same topic.
Expand Down
5 changes: 4 additions & 1 deletion docs/src/main/asciidoc/spring-integration-pubsub.adoc
Original file line number Diff line number Diff line change
Expand Up @@ -296,9 +296,12 @@ include::{project-root}/spring-cloud-gcp-autoconfigure/src/test/java/com/google/

These channel adapters contain header mappers that allow you to map, or filter out, headers from Spring to Google Cloud Pub/Sub messages, and vice-versa.
By default, the inbound channel adapter maps every header on the Google Cloud Pub/Sub messages to the Spring messages produced by the adapter.
The outbound channel adapter maps every header from Spring messages into Google Cloud Pub/Sub ones, except the ones added by Spring, like headers with key `"id"`, `"timestamp"` and `"gcp_pubsub_acknowledgement"`.
The outbound channel adapter maps every header from Spring messages into Google Cloud Pub/Sub ones, except the ones added by Spring and some special headers, like headers with key `"id"`, `"timestamp"`, `"gcp_pubsub_acknowledgement"`, and `"gcp_pubsub_ordering_key"`.
In the process, the outbound mapper also converts the value of the headers into string.

Note that you can provide the `GcpPubSubHeaders.ORDERING_KEY` (`"gcp_pubsub_ordering_key"`) header, which will be automatically mapped to `PubsubMessage.orderingKey` property, and excluded from the headers in the published message.
Remember to set `spring.cloud.gcp.pubsub.publisher.enable-message-ordering` to `true`, if you are publishing messages with this header.

Each adapter declares a `setHeaderMapper()` method to let you further customize which headers you want to map from Spring to Google Cloud Pub/Sub, and vice-versa.

For example, to filter out headers `"foo"`, `"bar"` and all headers starting with the prefix "prefix_", you can use `setHeaderMapper()` along with the `PubSubHeaderMapper` implementation provided by this module.
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -309,6 +309,7 @@ public PublisherFactory defaultPublisherFactory(
factory.setChannelProvider(publisherTransportChannelProvider);
retrySettings.ifAvailable(factory::setRetrySettings);
batchingSettings.ifAvailable(factory::setBatchingSettings);
factory.setEnableMessageOrdering(gcpPubSubProperties.getPublisher().getEnableMessageOrdering());
return factory;
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -126,6 +126,11 @@ public static class Publisher {
*/
private final Batching batching = new Batching();

/**
* Enable message ordering setting.
*/
private Boolean enableMessageOrdering;
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

why not a primitive boolean? Default state is false; would be no need to mess with nulls.

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

+1 yeah would also prefer the primitive boolean to avoid implying that not setting it is different than setting it to false.

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Sometimes we follow the pattern that is to allow not setting it, so that the client library default can be used automatically. Otherwise, we have to maintain the default, which in this case probably doesn't matter.


public Batching getBatching() {
return this.batching;
}
Expand All @@ -141,6 +146,14 @@ public int getExecutorThreads() {
public void setExecutorThreads(int executorThreads) {
this.executorThreads = executorThreads;
}

public Boolean getEnableMessageOrdering() {
return enableMessageOrdering;
}

public void setEnableMessageOrdering(Boolean enableMessageOrdering) {
this.enableMessageOrdering = enableMessageOrdering;
}
}

/**
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -34,6 +34,7 @@
import com.google.cloud.spring.pubsub.core.PubSubTemplate;
import com.google.cloud.spring.pubsub.integration.outbound.PubSubMessageHandler;
import com.google.cloud.spring.pubsub.support.AcknowledgeablePubsubMessage;
import com.google.cloud.spring.pubsub.support.GcpPubSubHeaders;
import com.google.cloud.spring.pubsub.support.converter.ConvertedAcknowledgeablePubsubMessage;
import com.google.cloud.spring.pubsub.support.converter.JacksonPubSubMessageConverter;
import com.google.cloud.spring.pubsub.support.converter.PubSubMessageConverter;
Expand Down Expand Up @@ -65,7 +66,8 @@
public class PubSubTemplateDocumentationIntegrationTests {

private ApplicationContextRunner contextRunner = new ApplicationContextRunner()
.withPropertyValues("spring.cloud.gcp.pubsub.subscriber.max-ack-extension-period=0")
.withPropertyValues("spring.cloud.gcp.pubsub.subscriber.max-ack-extension-period=0",
"spring.cloud.gcp.pubsub.publisher.enable-message-ordering=true")
.withConfiguration(AutoConfigurations.of(GcpContextAutoConfiguration.class,
GcpPubSubAutoConfiguration.class));

Expand Down Expand Up @@ -93,6 +95,35 @@ public void testCreatePublishPullNextAndDelete() {
});
}

@Test
public void testCreatePublishPullNextAndDelete_ordering() {
pubSubTest((AssertableApplicationContext context, PubSubTemplate pubSubTemplate, String subscriptionName, String topicName) -> {
//tag::publish_ordering[]
Map<String, String> headers = Collections.singletonMap(GcpPubSubHeaders.ORDERING_KEY, "key1");
pubSubTemplate.publish(topicName, "message1", headers).get();
pubSubTemplate.publish(topicName, "message2", headers).get();
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Maybe add a comment to explain that messages with the same ordering key will be delivered in the same order they were sent.

//end::publish_ordering[]

// message1
await().atMost(Duration.ofSeconds(30)).untilAsserted(() -> {
PubsubMessage pubsubMessage = pubSubTemplate.pullNext(subscriptionName);

assertThat(pubsubMessage).isNotNull();
assertThat(pubsubMessage.getData()).isEqualTo(ByteString.copyFromUtf8("message1"));
assertThat(pubsubMessage.getAttributesCount()).isZero();
});

// message2
await().atMost(Duration.ofSeconds(30)).untilAsserted(() -> {
PubsubMessage pubsubMessage = pubSubTemplate.pullNext(subscriptionName);

assertThat(pubsubMessage).isNotNull();
assertThat(pubsubMessage.getData()).isEqualTo(ByteString.copyFromUtf8("message2"));
assertThat(pubsubMessage.getAttributesCount()).isZero();
});
});
}


private void pubSubTest(PubSubTest pubSubTest, Class... configClass) {
ApplicationContextRunner contextRunner = configClass.length == 0 ? this.contextRunner
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -61,6 +61,8 @@ public class DefaultPublisherFactory implements PublisherFactory {

private BatchingSettings batchingSettings;

private Boolean enableMessageOrdering;

/**
* Create {@link DefaultPublisherFactory} instance based on the provided {@link GcpProjectIdProvider}.
* <p>The {@link GcpProjectIdProvider} must not be null, neither provide an empty {@code projectId}.
Expand Down Expand Up @@ -123,36 +125,22 @@ public void setBatchingSettings(BatchingSettings batchingSettings) {
this.batchingSettings = batchingSettings;
}

/**
* Set whether message ordering should be enabled on the publisher.
* @param enableMessageOrdering whether to enable message ordering
*/
public void setEnableMessageOrdering(Boolean enableMessageOrdering) {
this.enableMessageOrdering = enableMessageOrdering;
}

@Override
public Publisher createPublisher(String topic) {
return this.publishers.computeIfAbsent(topic, key -> {
try {
Publisher.Builder publisherBuilder =
Publisher.newBuilder(PubSubTopicUtils.toTopicName(topic, this.projectId));

if (this.executorProvider != null) {
publisherBuilder.setExecutorProvider(this.executorProvider);
}

if (this.channelProvider != null) {
publisherBuilder.setChannelProvider(this.channelProvider);
}

if (this.credentialsProvider != null) {
publisherBuilder.setCredentialsProvider(this.credentialsProvider);
}

if (this.headerProvider != null) {
publisherBuilder.setHeaderProvider(this.headerProvider);
}

if (this.retrySettings != null) {
publisherBuilder.setRetrySettings(this.retrySettings);
}

if (this.batchingSettings != null) {
publisherBuilder.setBatchingSettings(this.batchingSettings);
}
applyPublisherSettings(publisherBuilder);

return publisherBuilder.build();
}
Expand All @@ -163,6 +151,36 @@ public Publisher createPublisher(String topic) {
});
}

void applyPublisherSettings(Publisher.Builder publisherBuilder) {
if (this.executorProvider != null) {
publisherBuilder.setExecutorProvider(this.executorProvider);
}

if (this.channelProvider != null) {
publisherBuilder.setChannelProvider(this.channelProvider);
}

if (this.credentialsProvider != null) {
publisherBuilder.setCredentialsProvider(this.credentialsProvider);
}

if (this.headerProvider != null) {
publisherBuilder.setHeaderProvider(this.headerProvider);
}

if (this.retrySettings != null) {
publisherBuilder.setRetrySettings(this.retrySettings);
}

if (this.batchingSettings != null) {
publisherBuilder.setBatchingSettings(this.batchingSettings);
}

if (this.enableMessageOrdering != null) {
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

If it's primitive, then we don't need to check for null here, just straight set the value, or, to be extra safe, check if true.

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

The question is whether we want to allow not setting the value at all and letting the client lib control the default.

publisherBuilder.setEnableMessageOrdering(this.enableMessageOrdering);
}
}

Map<String, Publisher> getCache() {
return this.publishers;
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -44,6 +44,11 @@ private GcpPubSubHeaders() {
*/
public static final String ORIGINAL_MESSAGE = PREFIX + "original_message";

/**
* The Pub/Sub message ordering key.
*/
public static final String ORDERING_KEY = PREFIX + "ordering_key";

/**
* A simple utility method for pulling the {@link #ORIGINAL_MESSAGE} header out of a {@link Message}.
*
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -48,14 +48,8 @@ public JacksonPubSubMessageConverter(ObjectMapper objectMapper) {
@Override
public PubsubMessage toPubSubMessage(Object payload, Map<String, String> headers) {
try {
PubsubMessage.Builder pubsubMessageBuilder = PubsubMessage.newBuilder()
.setData(ByteString.copyFrom(this.objectMapper.writeValueAsBytes(payload)));

if (headers != null) {
pubsubMessageBuilder.putAllAttributes(headers);
}

return pubsubMessageBuilder.build();
return byteStringToPubSubMessage(ByteString.copyFrom(
this.objectMapper.writeValueAsBytes(payload)), headers);
}
catch (JsonProcessingException ex) {
throw new PubSubMessageConversionException("JSON serialization of an object of type " +
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -18,6 +18,8 @@

import java.util.Map;

import com.google.cloud.spring.pubsub.support.GcpPubSubHeaders;
import com.google.protobuf.ByteString;
import com.google.pubsub.v1.PubsubMessage;

/**
Expand All @@ -44,4 +46,20 @@ public interface PubSubMessageConverter {
* @return the object converted from the message's payload
*/
<T> T fromPubSubMessage(PubsubMessage message, Class<T> payloadType);

default PubsubMessage byteStringToPubSubMessage(ByteString payload, Map<String, String> headers) {
PubsubMessage.Builder pubsubMessageBuilder = PubsubMessage.newBuilder()
.setData(payload);

if (headers != null) {
pubsubMessageBuilder.putAllAttributes(headers);

if (headers.containsKey(GcpPubSubHeaders.ORDERING_KEY)) {
pubsubMessageBuilder.removeAttributes(GcpPubSubHeaders.ORDERING_KEY);
pubsubMessageBuilder.setOrderingKey(headers.get(GcpPubSubHeaders.ORDERING_KEY));
}
}

return pubsubMessageBuilder.build();
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -63,14 +63,7 @@ else if (payload instanceof byte[]) {
payload.getClass().getName() + " to byte[] for sending to Pub/Sub.");
}

PubsubMessage.Builder pubsubMessageBuilder = PubsubMessage.newBuilder()
.setData(convertedPayload);

if (headers != null) {
pubsubMessageBuilder.putAllAttributes(headers);
}

return pubsubMessageBuilder.build();
return byteStringToPubSubMessage(convertedPayload, headers);

}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -16,6 +16,8 @@

package com.google.cloud.spring.pubsub.integration.outbound;

import java.util.Collections;

import com.google.cloud.spring.core.util.MapBuilder;
import com.google.cloud.spring.pubsub.core.PubSubOperations;
import com.google.cloud.spring.pubsub.support.GcpPubSubHeaders;
Expand All @@ -40,7 +42,6 @@
import static org.mockito.ArgumentMatchers.eq;
import static org.mockito.ArgumentMatchers.isNull;
import static org.mockito.Mockito.spy;
import static org.mockito.Mockito.times;
import static org.mockito.Mockito.verify;
import static org.mockito.Mockito.when;

Expand Down Expand Up @@ -86,7 +87,7 @@ public void setUp() {
@Test
public void testPublish() {
this.adapter.handleMessage(this.message);
verify(this.pubSubTemplate, times(1))
verify(this.pubSubTemplate)
.publish(eq("testTopic"), eq("testPayload".getBytes()), anyMap());
}

Expand All @@ -99,7 +100,7 @@ public void testPublishDynamicTopic() {
.put(GcpPubSubHeaders.TOPIC, "dynamicTopic")
.build());
this.adapter.handleMessage(dynamicMessage);
verify(this.pubSubTemplate, times(1))
verify(this.pubSubTemplate)
.publish(eq("dynamicTopic"), eq("testPayload".getBytes()), anyMap());
}

Expand All @@ -114,7 +115,7 @@ public void testSendToExpressionTopic() {
.put("sendToTopic", "expressionTopic")
.build());
this.adapter.handleMessage(expressionMessage);
verify(this.pubSubTemplate, times(1))
verify(this.pubSubTemplate)
.publish(eq("expressionTopic"), eq("testPayload".getBytes()), anyMap());
}

Expand All @@ -125,7 +126,7 @@ public void testPublishSync() {
this.adapter.setPublishTimeoutExpression(timeout);

this.adapter.handleMessage(this.message);
verify(timeout, times(1)).getValue(isNull(), eq(this.message), eq(Long.class));
verify(timeout).getValue(isNull(), eq(this.message), eq(Long.class));
}

@Test
Expand All @@ -148,7 +149,7 @@ public void onSuccess(String result) {

assertThat(this.adapter.getPublishCallback()).isSameAs(callbackSpy);

verify(callbackSpy, times(1)).onSuccess("benfica");
verify(callbackSpy).onSuccess("benfica");
}

@Test
Expand Down Expand Up @@ -227,4 +228,18 @@ public void testSetHeaderMapperWithNull() {

this.adapter.setHeaderMapper(null);
}

@Test
public void testPublishWithOrderingKey() {
this.message = new GenericMessage<byte[]>("testPayload".getBytes(),
new MapBuilder<String, Object>()
.put(GcpPubSubHeaders.ORDERING_KEY, "key1")
.build());

this.adapter.handleMessage(this.message);
verify(this.pubSubTemplate)
.publish("testTopic", "testPayload".getBytes(),
Collections.singletonMap(GcpPubSubHeaders.ORDERING_KEY, "key1"));
}

}
Loading