diff --git a/docs/src/main/asciidoc/pubsub.adoc b/docs/src/main/asciidoc/pubsub.adoc index 8ca6cc5fa6..6dd7695fac 100644 --- a/docs/src/main/asciidoc/pubsub.adoc +++ b/docs/src/main/asciidoc/pubsub.adoc @@ -77,6 +77,8 @@ The delay threshold to use for batching. After this amount of time has elapsed (counting from the first element added), the elements will be wrapped up in a batch and sent. | No | 1 ms (batching off) | `spring.cloud.gcp.pubsub.publisher.batching.enabled`| Enables batching. | No | false +| `spring.cloud.gcp.pubsub.publisher.enable-message-ordering`| +Enables message ordering. | No | false |=== ==== GRPC Connection Settings @@ -176,6 +178,16 @@ include::{project-root}/spring-cloud-gcp-autoconfigure/src/test/java/com/google/ By default, the `SimplePubSubMessageConverter` is used to convert payloads of type `byte[]`, `ByteString`, `ByteBuffer`, and `String` to Pub/Sub messages. +===== Ordering messages + +If you are relying on message converters and would like to provide an ordering key, use the `GcpPubSubHeaders.ORDERING_KEY` header. +You will also need to make sure to enable message ordering on the publisher via the `spring.cloud.gcp.pubsub.publisher.enable-message-ordering` property. + +[source,java,indent=0] +---- +include::{project-root}/spring-cloud-gcp-autoconfigure/src/test/java/com/google/cloud/spring/autoconfigure/pubsub/it/PubSubTemplateDocumentationIntegrationTests.java[tag=publish_ordering] +---- + ==== Subscribing to a subscription Google Cloud Pub/Sub allows many subscriptions to be associated to the same topic. diff --git a/docs/src/main/asciidoc/spring-integration-pubsub.adoc b/docs/src/main/asciidoc/spring-integration-pubsub.adoc index d6f4fb8db0..43d5eb8ca8 100644 --- a/docs/src/main/asciidoc/spring-integration-pubsub.adoc +++ b/docs/src/main/asciidoc/spring-integration-pubsub.adoc @@ -296,9 +296,12 @@ include::{project-root}/spring-cloud-gcp-autoconfigure/src/test/java/com/google/ These channel adapters contain header mappers that allow you to map, or filter out, headers from Spring to Google Cloud Pub/Sub messages, and vice-versa. By default, the inbound channel adapter maps every header on the Google Cloud Pub/Sub messages to the Spring messages produced by the adapter. -The outbound channel adapter maps every header from Spring messages into Google Cloud Pub/Sub ones, except the ones added by Spring, like headers with key `"id"`, `"timestamp"` and `"gcp_pubsub_acknowledgement"`. +The outbound channel adapter maps every header from Spring messages into Google Cloud Pub/Sub ones, except the ones added by Spring and some special headers, like headers with key `"id"`, `"timestamp"`, `"gcp_pubsub_acknowledgement"`, and `"gcp_pubsub_ordering_key"`. In the process, the outbound mapper also converts the value of the headers into string. +Note that you can provide the `GcpPubSubHeaders.ORDERING_KEY` (`"gcp_pubsub_ordering_key"`) header, which will be automatically mapped to `PubsubMessage.orderingKey` property, and excluded from the headers in the published message. +Remember to set `spring.cloud.gcp.pubsub.publisher.enable-message-ordering` to `true`, if you are publishing messages with this header. + Each adapter declares a `setHeaderMapper()` method to let you further customize which headers you want to map from Spring to Google Cloud Pub/Sub, and vice-versa. For example, to filter out headers `"foo"`, `"bar"` and all headers starting with the prefix "prefix_", you can use `setHeaderMapper()` along with the `PubSubHeaderMapper` implementation provided by this module. diff --git a/spring-cloud-gcp-autoconfigure/src/main/java/com/google/cloud/spring/autoconfigure/pubsub/GcpPubSubAutoConfiguration.java b/spring-cloud-gcp-autoconfigure/src/main/java/com/google/cloud/spring/autoconfigure/pubsub/GcpPubSubAutoConfiguration.java index 8fe2e5786f..acee30f1ff 100644 --- a/spring-cloud-gcp-autoconfigure/src/main/java/com/google/cloud/spring/autoconfigure/pubsub/GcpPubSubAutoConfiguration.java +++ b/spring-cloud-gcp-autoconfigure/src/main/java/com/google/cloud/spring/autoconfigure/pubsub/GcpPubSubAutoConfiguration.java @@ -309,6 +309,7 @@ public PublisherFactory defaultPublisherFactory( factory.setChannelProvider(publisherTransportChannelProvider); retrySettings.ifAvailable(factory::setRetrySettings); batchingSettings.ifAvailable(factory::setBatchingSettings); + factory.setEnableMessageOrdering(gcpPubSubProperties.getPublisher().getEnableMessageOrdering()); return factory; } diff --git a/spring-cloud-gcp-autoconfigure/src/main/java/com/google/cloud/spring/autoconfigure/pubsub/GcpPubSubProperties.java b/spring-cloud-gcp-autoconfigure/src/main/java/com/google/cloud/spring/autoconfigure/pubsub/GcpPubSubProperties.java index e6bf13f9f4..49f92df1e0 100644 --- a/spring-cloud-gcp-autoconfigure/src/main/java/com/google/cloud/spring/autoconfigure/pubsub/GcpPubSubProperties.java +++ b/spring-cloud-gcp-autoconfigure/src/main/java/com/google/cloud/spring/autoconfigure/pubsub/GcpPubSubProperties.java @@ -126,6 +126,11 @@ public static class Publisher { */ private final Batching batching = new Batching(); + /** + * Enable message ordering setting. + */ + private Boolean enableMessageOrdering; + public Batching getBatching() { return this.batching; } @@ -141,6 +146,14 @@ public int getExecutorThreads() { public void setExecutorThreads(int executorThreads) { this.executorThreads = executorThreads; } + + public Boolean getEnableMessageOrdering() { + return enableMessageOrdering; + } + + public void setEnableMessageOrdering(Boolean enableMessageOrdering) { + this.enableMessageOrdering = enableMessageOrdering; + } } /** diff --git a/spring-cloud-gcp-autoconfigure/src/test/java/com/google/cloud/spring/autoconfigure/pubsub/it/PubSubTemplateDocumentationIntegrationTests.java b/spring-cloud-gcp-autoconfigure/src/test/java/com/google/cloud/spring/autoconfigure/pubsub/it/PubSubTemplateDocumentationIntegrationTests.java index 0509bfe609..223253a370 100644 --- a/spring-cloud-gcp-autoconfigure/src/test/java/com/google/cloud/spring/autoconfigure/pubsub/it/PubSubTemplateDocumentationIntegrationTests.java +++ b/spring-cloud-gcp-autoconfigure/src/test/java/com/google/cloud/spring/autoconfigure/pubsub/it/PubSubTemplateDocumentationIntegrationTests.java @@ -34,6 +34,7 @@ import com.google.cloud.spring.pubsub.core.PubSubTemplate; import com.google.cloud.spring.pubsub.integration.outbound.PubSubMessageHandler; import com.google.cloud.spring.pubsub.support.AcknowledgeablePubsubMessage; +import com.google.cloud.spring.pubsub.support.GcpPubSubHeaders; import com.google.cloud.spring.pubsub.support.converter.ConvertedAcknowledgeablePubsubMessage; import com.google.cloud.spring.pubsub.support.converter.JacksonPubSubMessageConverter; import com.google.cloud.spring.pubsub.support.converter.PubSubMessageConverter; @@ -65,7 +66,8 @@ public class PubSubTemplateDocumentationIntegrationTests { private ApplicationContextRunner contextRunner = new ApplicationContextRunner() - .withPropertyValues("spring.cloud.gcp.pubsub.subscriber.max-ack-extension-period=0") + .withPropertyValues("spring.cloud.gcp.pubsub.subscriber.max-ack-extension-period=0", + "spring.cloud.gcp.pubsub.publisher.enable-message-ordering=true") .withConfiguration(AutoConfigurations.of(GcpContextAutoConfiguration.class, GcpPubSubAutoConfiguration.class)); @@ -93,6 +95,35 @@ public void testCreatePublishPullNextAndDelete() { }); } + @Test + public void testCreatePublishPullNextAndDelete_ordering() { + pubSubTest((AssertableApplicationContext context, PubSubTemplate pubSubTemplate, String subscriptionName, String topicName) -> { + //tag::publish_ordering[] + Map headers = Collections.singletonMap(GcpPubSubHeaders.ORDERING_KEY, "key1"); + pubSubTemplate.publish(topicName, "message1", headers).get(); + pubSubTemplate.publish(topicName, "message2", headers).get(); + //end::publish_ordering[] + + // message1 + await().atMost(Duration.ofSeconds(30)).untilAsserted(() -> { + PubsubMessage pubsubMessage = pubSubTemplate.pullNext(subscriptionName); + + assertThat(pubsubMessage).isNotNull(); + assertThat(pubsubMessage.getData()).isEqualTo(ByteString.copyFromUtf8("message1")); + assertThat(pubsubMessage.getAttributesCount()).isZero(); + }); + + // message2 + await().atMost(Duration.ofSeconds(30)).untilAsserted(() -> { + PubsubMessage pubsubMessage = pubSubTemplate.pullNext(subscriptionName); + + assertThat(pubsubMessage).isNotNull(); + assertThat(pubsubMessage.getData()).isEqualTo(ByteString.copyFromUtf8("message2")); + assertThat(pubsubMessage.getAttributesCount()).isZero(); + }); + }); + } + private void pubSubTest(PubSubTest pubSubTest, Class... configClass) { ApplicationContextRunner contextRunner = configClass.length == 0 ? this.contextRunner diff --git a/spring-cloud-gcp-pubsub/src/main/java/com/google/cloud/spring/pubsub/support/DefaultPublisherFactory.java b/spring-cloud-gcp-pubsub/src/main/java/com/google/cloud/spring/pubsub/support/DefaultPublisherFactory.java index 8fc12253b4..f769282364 100644 --- a/spring-cloud-gcp-pubsub/src/main/java/com/google/cloud/spring/pubsub/support/DefaultPublisherFactory.java +++ b/spring-cloud-gcp-pubsub/src/main/java/com/google/cloud/spring/pubsub/support/DefaultPublisherFactory.java @@ -61,6 +61,8 @@ public class DefaultPublisherFactory implements PublisherFactory { private BatchingSettings batchingSettings; + private Boolean enableMessageOrdering; + /** * Create {@link DefaultPublisherFactory} instance based on the provided {@link GcpProjectIdProvider}. *

The {@link GcpProjectIdProvider} must not be null, neither provide an empty {@code projectId}. @@ -123,6 +125,14 @@ public void setBatchingSettings(BatchingSettings batchingSettings) { this.batchingSettings = batchingSettings; } + /** + * Set whether message ordering should be enabled on the publisher. + * @param enableMessageOrdering whether to enable message ordering + */ + public void setEnableMessageOrdering(Boolean enableMessageOrdering) { + this.enableMessageOrdering = enableMessageOrdering; + } + @Override public Publisher createPublisher(String topic) { return this.publishers.computeIfAbsent(topic, key -> { @@ -130,29 +140,7 @@ public Publisher createPublisher(String topic) { Publisher.Builder publisherBuilder = Publisher.newBuilder(PubSubTopicUtils.toTopicName(topic, this.projectId)); - if (this.executorProvider != null) { - publisherBuilder.setExecutorProvider(this.executorProvider); - } - - if (this.channelProvider != null) { - publisherBuilder.setChannelProvider(this.channelProvider); - } - - if (this.credentialsProvider != null) { - publisherBuilder.setCredentialsProvider(this.credentialsProvider); - } - - if (this.headerProvider != null) { - publisherBuilder.setHeaderProvider(this.headerProvider); - } - - if (this.retrySettings != null) { - publisherBuilder.setRetrySettings(this.retrySettings); - } - - if (this.batchingSettings != null) { - publisherBuilder.setBatchingSettings(this.batchingSettings); - } + applyPublisherSettings(publisherBuilder); return publisherBuilder.build(); } @@ -163,6 +151,36 @@ public Publisher createPublisher(String topic) { }); } + void applyPublisherSettings(Publisher.Builder publisherBuilder) { + if (this.executorProvider != null) { + publisherBuilder.setExecutorProvider(this.executorProvider); + } + + if (this.channelProvider != null) { + publisherBuilder.setChannelProvider(this.channelProvider); + } + + if (this.credentialsProvider != null) { + publisherBuilder.setCredentialsProvider(this.credentialsProvider); + } + + if (this.headerProvider != null) { + publisherBuilder.setHeaderProvider(this.headerProvider); + } + + if (this.retrySettings != null) { + publisherBuilder.setRetrySettings(this.retrySettings); + } + + if (this.batchingSettings != null) { + publisherBuilder.setBatchingSettings(this.batchingSettings); + } + + if (this.enableMessageOrdering != null) { + publisherBuilder.setEnableMessageOrdering(this.enableMessageOrdering); + } + } + Map getCache() { return this.publishers; } diff --git a/spring-cloud-gcp-pubsub/src/main/java/com/google/cloud/spring/pubsub/support/GcpPubSubHeaders.java b/spring-cloud-gcp-pubsub/src/main/java/com/google/cloud/spring/pubsub/support/GcpPubSubHeaders.java index 9540f6f24a..853bb87f66 100644 --- a/spring-cloud-gcp-pubsub/src/main/java/com/google/cloud/spring/pubsub/support/GcpPubSubHeaders.java +++ b/spring-cloud-gcp-pubsub/src/main/java/com/google/cloud/spring/pubsub/support/GcpPubSubHeaders.java @@ -44,6 +44,11 @@ private GcpPubSubHeaders() { */ public static final String ORIGINAL_MESSAGE = PREFIX + "original_message"; + /** + * The Pub/Sub message ordering key. + */ + public static final String ORDERING_KEY = PREFIX + "ordering_key"; + /** * A simple utility method for pulling the {@link #ORIGINAL_MESSAGE} header out of a {@link Message}. * diff --git a/spring-cloud-gcp-pubsub/src/main/java/com/google/cloud/spring/pubsub/support/converter/JacksonPubSubMessageConverter.java b/spring-cloud-gcp-pubsub/src/main/java/com/google/cloud/spring/pubsub/support/converter/JacksonPubSubMessageConverter.java index 75787be604..68be78f408 100644 --- a/spring-cloud-gcp-pubsub/src/main/java/com/google/cloud/spring/pubsub/support/converter/JacksonPubSubMessageConverter.java +++ b/spring-cloud-gcp-pubsub/src/main/java/com/google/cloud/spring/pubsub/support/converter/JacksonPubSubMessageConverter.java @@ -48,14 +48,8 @@ public JacksonPubSubMessageConverter(ObjectMapper objectMapper) { @Override public PubsubMessage toPubSubMessage(Object payload, Map headers) { try { - PubsubMessage.Builder pubsubMessageBuilder = PubsubMessage.newBuilder() - .setData(ByteString.copyFrom(this.objectMapper.writeValueAsBytes(payload))); - - if (headers != null) { - pubsubMessageBuilder.putAllAttributes(headers); - } - - return pubsubMessageBuilder.build(); + return byteStringToPubSubMessage(ByteString.copyFrom( + this.objectMapper.writeValueAsBytes(payload)), headers); } catch (JsonProcessingException ex) { throw new PubSubMessageConversionException("JSON serialization of an object of type " + diff --git a/spring-cloud-gcp-pubsub/src/main/java/com/google/cloud/spring/pubsub/support/converter/PubSubMessageConverter.java b/spring-cloud-gcp-pubsub/src/main/java/com/google/cloud/spring/pubsub/support/converter/PubSubMessageConverter.java index f2f480f11e..cbc8ad654c 100644 --- a/spring-cloud-gcp-pubsub/src/main/java/com/google/cloud/spring/pubsub/support/converter/PubSubMessageConverter.java +++ b/spring-cloud-gcp-pubsub/src/main/java/com/google/cloud/spring/pubsub/support/converter/PubSubMessageConverter.java @@ -18,6 +18,8 @@ import java.util.Map; +import com.google.cloud.spring.pubsub.support.GcpPubSubHeaders; +import com.google.protobuf.ByteString; import com.google.pubsub.v1.PubsubMessage; /** @@ -44,4 +46,20 @@ public interface PubSubMessageConverter { * @return the object converted from the message's payload */ T fromPubSubMessage(PubsubMessage message, Class payloadType); + + default PubsubMessage byteStringToPubSubMessage(ByteString payload, Map headers) { + PubsubMessage.Builder pubsubMessageBuilder = PubsubMessage.newBuilder() + .setData(payload); + + if (headers != null) { + pubsubMessageBuilder.putAllAttributes(headers); + + if (headers.containsKey(GcpPubSubHeaders.ORDERING_KEY)) { + pubsubMessageBuilder.removeAttributes(GcpPubSubHeaders.ORDERING_KEY); + pubsubMessageBuilder.setOrderingKey(headers.get(GcpPubSubHeaders.ORDERING_KEY)); + } + } + + return pubsubMessageBuilder.build(); + } } diff --git a/spring-cloud-gcp-pubsub/src/main/java/com/google/cloud/spring/pubsub/support/converter/SimplePubSubMessageConverter.java b/spring-cloud-gcp-pubsub/src/main/java/com/google/cloud/spring/pubsub/support/converter/SimplePubSubMessageConverter.java index 3992f0b47d..f4e36d8216 100644 --- a/spring-cloud-gcp-pubsub/src/main/java/com/google/cloud/spring/pubsub/support/converter/SimplePubSubMessageConverter.java +++ b/spring-cloud-gcp-pubsub/src/main/java/com/google/cloud/spring/pubsub/support/converter/SimplePubSubMessageConverter.java @@ -63,14 +63,7 @@ else if (payload instanceof byte[]) { payload.getClass().getName() + " to byte[] for sending to Pub/Sub."); } - PubsubMessage.Builder pubsubMessageBuilder = PubsubMessage.newBuilder() - .setData(convertedPayload); - - if (headers != null) { - pubsubMessageBuilder.putAllAttributes(headers); - } - - return pubsubMessageBuilder.build(); + return byteStringToPubSubMessage(convertedPayload, headers); } diff --git a/spring-cloud-gcp-pubsub/src/test/java/com/google/cloud/spring/pubsub/integration/outbound/PubSubMessageHandlerTests.java b/spring-cloud-gcp-pubsub/src/test/java/com/google/cloud/spring/pubsub/integration/outbound/PubSubMessageHandlerTests.java index c24d1584a6..34db20aa59 100644 --- a/spring-cloud-gcp-pubsub/src/test/java/com/google/cloud/spring/pubsub/integration/outbound/PubSubMessageHandlerTests.java +++ b/spring-cloud-gcp-pubsub/src/test/java/com/google/cloud/spring/pubsub/integration/outbound/PubSubMessageHandlerTests.java @@ -16,6 +16,8 @@ package com.google.cloud.spring.pubsub.integration.outbound; +import java.util.Collections; + import com.google.cloud.spring.core.util.MapBuilder; import com.google.cloud.spring.pubsub.core.PubSubOperations; import com.google.cloud.spring.pubsub.support.GcpPubSubHeaders; @@ -40,7 +42,6 @@ import static org.mockito.ArgumentMatchers.eq; import static org.mockito.ArgumentMatchers.isNull; import static org.mockito.Mockito.spy; -import static org.mockito.Mockito.times; import static org.mockito.Mockito.verify; import static org.mockito.Mockito.when; @@ -86,7 +87,7 @@ public void setUp() { @Test public void testPublish() { this.adapter.handleMessage(this.message); - verify(this.pubSubTemplate, times(1)) + verify(this.pubSubTemplate) .publish(eq("testTopic"), eq("testPayload".getBytes()), anyMap()); } @@ -99,7 +100,7 @@ public void testPublishDynamicTopic() { .put(GcpPubSubHeaders.TOPIC, "dynamicTopic") .build()); this.adapter.handleMessage(dynamicMessage); - verify(this.pubSubTemplate, times(1)) + verify(this.pubSubTemplate) .publish(eq("dynamicTopic"), eq("testPayload".getBytes()), anyMap()); } @@ -114,7 +115,7 @@ public void testSendToExpressionTopic() { .put("sendToTopic", "expressionTopic") .build()); this.adapter.handleMessage(expressionMessage); - verify(this.pubSubTemplate, times(1)) + verify(this.pubSubTemplate) .publish(eq("expressionTopic"), eq("testPayload".getBytes()), anyMap()); } @@ -125,7 +126,7 @@ public void testPublishSync() { this.adapter.setPublishTimeoutExpression(timeout); this.adapter.handleMessage(this.message); - verify(timeout, times(1)).getValue(isNull(), eq(this.message), eq(Long.class)); + verify(timeout).getValue(isNull(), eq(this.message), eq(Long.class)); } @Test @@ -148,7 +149,7 @@ public void onSuccess(String result) { assertThat(this.adapter.getPublishCallback()).isSameAs(callbackSpy); - verify(callbackSpy, times(1)).onSuccess("benfica"); + verify(callbackSpy).onSuccess("benfica"); } @Test @@ -227,4 +228,18 @@ public void testSetHeaderMapperWithNull() { this.adapter.setHeaderMapper(null); } + + @Test + public void testPublishWithOrderingKey() { + this.message = new GenericMessage("testPayload".getBytes(), + new MapBuilder() + .put(GcpPubSubHeaders.ORDERING_KEY, "key1") + .build()); + + this.adapter.handleMessage(this.message); + verify(this.pubSubTemplate) + .publish("testTopic", "testPayload".getBytes(), + Collections.singletonMap(GcpPubSubHeaders.ORDERING_KEY, "key1")); + } + } diff --git a/spring-cloud-gcp-pubsub/src/test/java/com/google/cloud/spring/pubsub/support/converter/JacksonPubSubMessageConverterTests.java b/spring-cloud-gcp-pubsub/src/test/java/com/google/cloud/spring/pubsub/support/converter/JacksonPubSubMessageConverterTests.java index 49e3f25359..b88554d04a 100644 --- a/spring-cloud-gcp-pubsub/src/test/java/com/google/cloud/spring/pubsub/support/converter/JacksonPubSubMessageConverterTests.java +++ b/spring-cloud-gcp-pubsub/src/test/java/com/google/cloud/spring/pubsub/support/converter/JacksonPubSubMessageConverterTests.java @@ -16,10 +16,12 @@ package com.google.cloud.spring.pubsub.support.converter; +import java.util.Collections; + import com.fasterxml.jackson.databind.ObjectMapper; +import com.google.cloud.spring.pubsub.support.GcpPubSubHeaders; import com.google.pubsub.v1.PubsubMessage; import org.json.JSONException; -import org.junit.Assert; import org.junit.Test; import org.skyscreamer.jsonassert.JSONAssert; @@ -74,7 +76,27 @@ public void testPojo() throws JSONException { @Test public void testToPubSubMessageWithNullPayload() throws JSONException { PubsubMessage pubsubMessage = this.converter.toPubSubMessage(null, null); - Assert.assertNotNull(pubsubMessage); + assertThat(pubsubMessage).isNotNull(); + assertThat(pubsubMessage.getAttributesCount()).isZero(); + } + + @Test + public void testToPubSubMessageWithOrderingKeyHeader() throws JSONException { + PubsubMessage pubsubMessage = this.converter.toPubSubMessage(null, + Collections.singletonMap(GcpPubSubHeaders.ORDERING_KEY, "key1")); + assertThat(pubsubMessage).isNotNull(); + assertThat(pubsubMessage.getOrderingKey()).isEqualTo("key1"); + assertThat(pubsubMessage.getAttributesCount()).isZero(); + } + + @Test + public void testToPubSubMessageWitHeader() throws JSONException { + PubsubMessage pubsubMessage = this.converter.toPubSubMessage(null, + Collections.singletonMap("custom-header", "val1")); + assertThat(pubsubMessage).isNotNull(); + assertThat(pubsubMessage.getOrderingKey()).isEmpty(); + assertThat(pubsubMessage.getAttributesCount()).isOne(); + assertThat(pubsubMessage.getAttributesMap()).containsEntry("custom-header", "val1"); } /** diff --git a/spring-cloud-gcp-pubsub/src/test/java/com/google/cloud/spring/pubsub/support/converter/SimplePubSubMessageConverterTests.java b/spring-cloud-gcp-pubsub/src/test/java/com/google/cloud/spring/pubsub/support/converter/SimplePubSubMessageConverterTests.java index b17b79aa96..671d73bc81 100644 --- a/spring-cloud-gcp-pubsub/src/test/java/com/google/cloud/spring/pubsub/support/converter/SimplePubSubMessageConverterTests.java +++ b/spring-cloud-gcp-pubsub/src/test/java/com/google/cloud/spring/pubsub/support/converter/SimplePubSubMessageConverterTests.java @@ -18,12 +18,15 @@ import java.nio.ByteBuffer; import java.nio.charset.Charset; +import java.util.Collections; import java.util.HashMap; import java.util.Map; import com.google.cloud.spring.core.util.MapBuilder; +import com.google.cloud.spring.pubsub.support.GcpPubSubHeaders; import com.google.protobuf.ByteString; import com.google.pubsub.v1.PubsubMessage; +import org.json.JSONException; import org.junit.Rule; import org.junit.Test; import org.junit.rules.ExpectedException; @@ -144,4 +147,15 @@ private void doFromTest(T value) { assertThat(new String(convertedPubSubMessage.getData().toByteArray())).isEqualTo(TEST_STRING); assertThat(convertedPubSubMessage.getAttributesMap()).isEqualTo(TEST_HEADERS); } + + @Test + public void testOrderingKeyHeader() throws JSONException { + SimplePubSubMessageConverter converter = new SimplePubSubMessageConverter(); + PubsubMessage pubsubMessage = converter.toPubSubMessage("test payload", + Collections.singletonMap(GcpPubSubHeaders.ORDERING_KEY, "key1")); + assertThat(pubsubMessage).isNotNull(); + assertThat(pubsubMessage.getOrderingKey()).isEqualTo("key1"); + assertThat(pubsubMessage.getAttributesCount()).isZero(); + } + }