Skip to content

Commit 9e11e39

Browse files
kateryna216GitHub
authored and
GitHub
committed
Enable Pub/Sub message ordering for publishing (GoogleCloudPlatform#408)
Introduces `GcpPubSubHeaders.ORDERING_KEY` header that will be automatically applied to the `PubsubMessage` up conversion. Also, introduces `spring.cloud.gcp.pubsub.publisher.enable-message-ordering` property to enable message ordering on the `Publisher`. First step in addressing: GoogleCloudPlatform#85.
1 parent 33beac0 commit 9e11e39

File tree

13 files changed

+188
-49
lines changed

13 files changed

+188
-49
lines changed

docs/src/main/asciidoc/pubsub.adoc

+12
Original file line numberDiff line numberDiff line change
@@ -77,6 +77,8 @@ The delay threshold to use for batching.
7777
After this amount of time has elapsed (counting from the first element added), the elements will be wrapped up in a batch and sent. | No | 1 ms (batching off)
7878
| `spring.cloud.gcp.pubsub.publisher.batching.enabled`|
7979
Enables batching. | No | false
80+
| `spring.cloud.gcp.pubsub.publisher.enable-message-ordering`|
81+
Enables message ordering. | No | false
8082
|===
8183

8284
==== GRPC Connection Settings
@@ -176,6 +178,16 @@ include::{project-root}/spring-cloud-gcp-autoconfigure/src/test/java/com/google/
176178

177179
By default, the `SimplePubSubMessageConverter` is used to convert payloads of type `byte[]`, `ByteString`, `ByteBuffer`, and `String` to Pub/Sub messages.
178180

181+
===== Ordering messages
182+
183+
If you are relying on message converters and would like to provide an ordering key, use the `GcpPubSubHeaders.ORDERING_KEY` header.
184+
You will also need to make sure to enable message ordering on the publisher via the `spring.cloud.gcp.pubsub.publisher.enable-message-ordering` property.
185+
186+
[source,java,indent=0]
187+
----
188+
include::{project-root}/spring-cloud-gcp-autoconfigure/src/test/java/com/google/cloud/spring/autoconfigure/pubsub/it/PubSubTemplateDocumentationIntegrationTests.java[tag=publish_ordering]
189+
----
190+
179191
==== Subscribing to a subscription
180192

181193
Google Cloud Pub/Sub allows many subscriptions to be associated to the same topic.

docs/src/main/asciidoc/spring-integration-pubsub.adoc

+4-1
Original file line numberDiff line numberDiff line change
@@ -296,9 +296,12 @@ include::{project-root}/spring-cloud-gcp-autoconfigure/src/test/java/com/google/
296296

297297
These channel adapters contain header mappers that allow you to map, or filter out, headers from Spring to Google Cloud Pub/Sub messages, and vice-versa.
298298
By default, the inbound channel adapter maps every header on the Google Cloud Pub/Sub messages to the Spring messages produced by the adapter.
299-
The outbound channel adapter maps every header from Spring messages into Google Cloud Pub/Sub ones, except the ones added by Spring, like headers with key `"id"`, `"timestamp"` and `"gcp_pubsub_acknowledgement"`.
299+
The outbound channel adapter maps every header from Spring messages into Google Cloud Pub/Sub ones, except the ones added by Spring and some special headers, like headers with key `"id"`, `"timestamp"`, `"gcp_pubsub_acknowledgement"`, and `"gcp_pubsub_ordering_key"`.
300300
In the process, the outbound mapper also converts the value of the headers into string.
301301

302+
Note that you can provide the `GcpPubSubHeaders.ORDERING_KEY` (`"gcp_pubsub_ordering_key"`) header, which will be automatically mapped to `PubsubMessage.orderingKey` property, and excluded from the headers in the published message.
303+
Remember to set `spring.cloud.gcp.pubsub.publisher.enable-message-ordering` to `true`, if you are publishing messages with this header.
304+
302305
Each adapter declares a `setHeaderMapper()` method to let you further customize which headers you want to map from Spring to Google Cloud Pub/Sub, and vice-versa.
303306

304307
For example, to filter out headers `"foo"`, `"bar"` and all headers starting with the prefix "prefix_", you can use `setHeaderMapper()` along with the `PubSubHeaderMapper` implementation provided by this module.

spring-cloud-gcp-autoconfigure/src/main/java/com/google/cloud/spring/autoconfigure/pubsub/GcpPubSubAutoConfiguration.java

+1
Original file line numberDiff line numberDiff line change
@@ -309,6 +309,7 @@ public PublisherFactory defaultPublisherFactory(
309309
factory.setChannelProvider(publisherTransportChannelProvider);
310310
retrySettings.ifAvailable(factory::setRetrySettings);
311311
batchingSettings.ifAvailable(factory::setBatchingSettings);
312+
factory.setEnableMessageOrdering(gcpPubSubProperties.getPublisher().getEnableMessageOrdering());
312313
return factory;
313314
}
314315

spring-cloud-gcp-autoconfigure/src/main/java/com/google/cloud/spring/autoconfigure/pubsub/GcpPubSubProperties.java

+13
Original file line numberDiff line numberDiff line change
@@ -126,6 +126,11 @@ public static class Publisher {
126126
*/
127127
private final Batching batching = new Batching();
128128

129+
/**
130+
* Enable message ordering setting.
131+
*/
132+
private Boolean enableMessageOrdering;
133+
129134
public Batching getBatching() {
130135
return this.batching;
131136
}
@@ -141,6 +146,14 @@ public int getExecutorThreads() {
141146
public void setExecutorThreads(int executorThreads) {
142147
this.executorThreads = executorThreads;
143148
}
149+
150+
public Boolean getEnableMessageOrdering() {
151+
return enableMessageOrdering;
152+
}
153+
154+
public void setEnableMessageOrdering(Boolean enableMessageOrdering) {
155+
this.enableMessageOrdering = enableMessageOrdering;
156+
}
144157
}
145158

146159
/**

spring-cloud-gcp-autoconfigure/src/test/java/com/google/cloud/spring/autoconfigure/pubsub/it/PubSubTemplateDocumentationIntegrationTests.java

+32-1
Original file line numberDiff line numberDiff line change
@@ -34,6 +34,7 @@
3434
import com.google.cloud.spring.pubsub.core.PubSubTemplate;
3535
import com.google.cloud.spring.pubsub.integration.outbound.PubSubMessageHandler;
3636
import com.google.cloud.spring.pubsub.support.AcknowledgeablePubsubMessage;
37+
import com.google.cloud.spring.pubsub.support.GcpPubSubHeaders;
3738
import com.google.cloud.spring.pubsub.support.converter.ConvertedAcknowledgeablePubsubMessage;
3839
import com.google.cloud.spring.pubsub.support.converter.JacksonPubSubMessageConverter;
3940
import com.google.cloud.spring.pubsub.support.converter.PubSubMessageConverter;
@@ -65,7 +66,8 @@
6566
public class PubSubTemplateDocumentationIntegrationTests {
6667

6768
private ApplicationContextRunner contextRunner = new ApplicationContextRunner()
68-
.withPropertyValues("spring.cloud.gcp.pubsub.subscriber.max-ack-extension-period=0")
69+
.withPropertyValues("spring.cloud.gcp.pubsub.subscriber.max-ack-extension-period=0",
70+
"spring.cloud.gcp.pubsub.publisher.enable-message-ordering=true")
6971
.withConfiguration(AutoConfigurations.of(GcpContextAutoConfiguration.class,
7072
GcpPubSubAutoConfiguration.class));
7173

@@ -93,6 +95,35 @@ public void testCreatePublishPullNextAndDelete() {
9395
});
9496
}
9597

98+
@Test
99+
public void testCreatePublishPullNextAndDelete_ordering() {
100+
pubSubTest((AssertableApplicationContext context, PubSubTemplate pubSubTemplate, String subscriptionName, String topicName) -> {
101+
//tag::publish_ordering[]
102+
Map<String, String> headers = Collections.singletonMap(GcpPubSubHeaders.ORDERING_KEY, "key1");
103+
pubSubTemplate.publish(topicName, "message1", headers).get();
104+
pubSubTemplate.publish(topicName, "message2", headers).get();
105+
//end::publish_ordering[]
106+
107+
// message1
108+
await().atMost(Duration.ofSeconds(30)).untilAsserted(() -> {
109+
PubsubMessage pubsubMessage = pubSubTemplate.pullNext(subscriptionName);
110+
111+
assertThat(pubsubMessage).isNotNull();
112+
assertThat(pubsubMessage.getData()).isEqualTo(ByteString.copyFromUtf8("message1"));
113+
assertThat(pubsubMessage.getAttributesCount()).isZero();
114+
});
115+
116+
// message2
117+
await().atMost(Duration.ofSeconds(30)).untilAsserted(() -> {
118+
PubsubMessage pubsubMessage = pubSubTemplate.pullNext(subscriptionName);
119+
120+
assertThat(pubsubMessage).isNotNull();
121+
assertThat(pubsubMessage.getData()).isEqualTo(ByteString.copyFromUtf8("message2"));
122+
assertThat(pubsubMessage.getAttributesCount()).isZero();
123+
});
124+
});
125+
}
126+
96127

97128
private void pubSubTest(PubSubTest pubSubTest, Class... configClass) {
98129
ApplicationContextRunner contextRunner = configClass.length == 0 ? this.contextRunner

spring-cloud-gcp-pubsub/src/main/java/com/google/cloud/spring/pubsub/support/DefaultPublisherFactory.java

+41-23
Original file line numberDiff line numberDiff line change
@@ -61,6 +61,8 @@ public class DefaultPublisherFactory implements PublisherFactory {
6161

6262
private BatchingSettings batchingSettings;
6363

64+
private Boolean enableMessageOrdering;
65+
6466
/**
6567
* Create {@link DefaultPublisherFactory} instance based on the provided {@link GcpProjectIdProvider}.
6668
* <p>The {@link GcpProjectIdProvider} must not be null, neither provide an empty {@code projectId}.
@@ -123,36 +125,22 @@ public void setBatchingSettings(BatchingSettings batchingSettings) {
123125
this.batchingSettings = batchingSettings;
124126
}
125127

128+
/**
129+
* Set whether message ordering should be enabled on the publisher.
130+
* @param enableMessageOrdering whether to enable message ordering
131+
*/
132+
public void setEnableMessageOrdering(Boolean enableMessageOrdering) {
133+
this.enableMessageOrdering = enableMessageOrdering;
134+
}
135+
126136
@Override
127137
public Publisher createPublisher(String topic) {
128138
return this.publishers.computeIfAbsent(topic, key -> {
129139
try {
130140
Publisher.Builder publisherBuilder =
131141
Publisher.newBuilder(PubSubTopicUtils.toTopicName(topic, this.projectId));
132142

133-
if (this.executorProvider != null) {
134-
publisherBuilder.setExecutorProvider(this.executorProvider);
135-
}
136-
137-
if (this.channelProvider != null) {
138-
publisherBuilder.setChannelProvider(this.channelProvider);
139-
}
140-
141-
if (this.credentialsProvider != null) {
142-
publisherBuilder.setCredentialsProvider(this.credentialsProvider);
143-
}
144-
145-
if (this.headerProvider != null) {
146-
publisherBuilder.setHeaderProvider(this.headerProvider);
147-
}
148-
149-
if (this.retrySettings != null) {
150-
publisherBuilder.setRetrySettings(this.retrySettings);
151-
}
152-
153-
if (this.batchingSettings != null) {
154-
publisherBuilder.setBatchingSettings(this.batchingSettings);
155-
}
143+
applyPublisherSettings(publisherBuilder);
156144

157145
return publisherBuilder.build();
158146
}
@@ -163,6 +151,36 @@ public Publisher createPublisher(String topic) {
163151
});
164152
}
165153

154+
void applyPublisherSettings(Publisher.Builder publisherBuilder) {
155+
if (this.executorProvider != null) {
156+
publisherBuilder.setExecutorProvider(this.executorProvider);
157+
}
158+
159+
if (this.channelProvider != null) {
160+
publisherBuilder.setChannelProvider(this.channelProvider);
161+
}
162+
163+
if (this.credentialsProvider != null) {
164+
publisherBuilder.setCredentialsProvider(this.credentialsProvider);
165+
}
166+
167+
if (this.headerProvider != null) {
168+
publisherBuilder.setHeaderProvider(this.headerProvider);
169+
}
170+
171+
if (this.retrySettings != null) {
172+
publisherBuilder.setRetrySettings(this.retrySettings);
173+
}
174+
175+
if (this.batchingSettings != null) {
176+
publisherBuilder.setBatchingSettings(this.batchingSettings);
177+
}
178+
179+
if (this.enableMessageOrdering != null) {
180+
publisherBuilder.setEnableMessageOrdering(this.enableMessageOrdering);
181+
}
182+
}
183+
166184
Map<String, Publisher> getCache() {
167185
return this.publishers;
168186
}

spring-cloud-gcp-pubsub/src/main/java/com/google/cloud/spring/pubsub/support/GcpPubSubHeaders.java

+5
Original file line numberDiff line numberDiff line change
@@ -44,6 +44,11 @@ private GcpPubSubHeaders() {
4444
*/
4545
public static final String ORIGINAL_MESSAGE = PREFIX + "original_message";
4646

47+
/**
48+
* The Pub/Sub message ordering key.
49+
*/
50+
public static final String ORDERING_KEY = PREFIX + "ordering_key";
51+
4752
/**
4853
* A simple utility method for pulling the {@link #ORIGINAL_MESSAGE} header out of a {@link Message}.
4954
*

spring-cloud-gcp-pubsub/src/main/java/com/google/cloud/spring/pubsub/support/converter/JacksonPubSubMessageConverter.java

+2-8
Original file line numberDiff line numberDiff line change
@@ -48,14 +48,8 @@ public JacksonPubSubMessageConverter(ObjectMapper objectMapper) {
4848
@Override
4949
public PubsubMessage toPubSubMessage(Object payload, Map<String, String> headers) {
5050
try {
51-
PubsubMessage.Builder pubsubMessageBuilder = PubsubMessage.newBuilder()
52-
.setData(ByteString.copyFrom(this.objectMapper.writeValueAsBytes(payload)));
53-
54-
if (headers != null) {
55-
pubsubMessageBuilder.putAllAttributes(headers);
56-
}
57-
58-
return pubsubMessageBuilder.build();
51+
return byteStringToPubSubMessage(ByteString.copyFrom(
52+
this.objectMapper.writeValueAsBytes(payload)), headers);
5953
}
6054
catch (JsonProcessingException ex) {
6155
throw new PubSubMessageConversionException("JSON serialization of an object of type " +

spring-cloud-gcp-pubsub/src/main/java/com/google/cloud/spring/pubsub/support/converter/PubSubMessageConverter.java

+18
Original file line numberDiff line numberDiff line change
@@ -18,6 +18,8 @@
1818

1919
import java.util.Map;
2020

21+
import com.google.cloud.spring.pubsub.support.GcpPubSubHeaders;
22+
import com.google.protobuf.ByteString;
2123
import com.google.pubsub.v1.PubsubMessage;
2224

2325
/**
@@ -44,4 +46,20 @@ public interface PubSubMessageConverter {
4446
* @return the object converted from the message's payload
4547
*/
4648
<T> T fromPubSubMessage(PubsubMessage message, Class<T> payloadType);
49+
50+
default PubsubMessage byteStringToPubSubMessage(ByteString payload, Map<String, String> headers) {
51+
PubsubMessage.Builder pubsubMessageBuilder = PubsubMessage.newBuilder()
52+
.setData(payload);
53+
54+
if (headers != null) {
55+
pubsubMessageBuilder.putAllAttributes(headers);
56+
57+
if (headers.containsKey(GcpPubSubHeaders.ORDERING_KEY)) {
58+
pubsubMessageBuilder.removeAttributes(GcpPubSubHeaders.ORDERING_KEY);
59+
pubsubMessageBuilder.setOrderingKey(headers.get(GcpPubSubHeaders.ORDERING_KEY));
60+
}
61+
}
62+
63+
return pubsubMessageBuilder.build();
64+
}
4765
}

spring-cloud-gcp-pubsub/src/main/java/com/google/cloud/spring/pubsub/support/converter/SimplePubSubMessageConverter.java

+1-8
Original file line numberDiff line numberDiff line change
@@ -63,14 +63,7 @@ else if (payload instanceof byte[]) {
6363
payload.getClass().getName() + " to byte[] for sending to Pub/Sub.");
6464
}
6565

66-
PubsubMessage.Builder pubsubMessageBuilder = PubsubMessage.newBuilder()
67-
.setData(convertedPayload);
68-
69-
if (headers != null) {
70-
pubsubMessageBuilder.putAllAttributes(headers);
71-
}
72-
73-
return pubsubMessageBuilder.build();
66+
return byteStringToPubSubMessage(convertedPayload, headers);
7467

7568
}
7669

spring-cloud-gcp-pubsub/src/test/java/com/google/cloud/spring/pubsub/integration/outbound/PubSubMessageHandlerTests.java

+21-6
Original file line numberDiff line numberDiff line change
@@ -16,6 +16,8 @@
1616

1717
package com.google.cloud.spring.pubsub.integration.outbound;
1818

19+
import java.util.Collections;
20+
1921
import com.google.cloud.spring.core.util.MapBuilder;
2022
import com.google.cloud.spring.pubsub.core.PubSubOperations;
2123
import com.google.cloud.spring.pubsub.support.GcpPubSubHeaders;
@@ -40,7 +42,6 @@
4042
import static org.mockito.ArgumentMatchers.eq;
4143
import static org.mockito.ArgumentMatchers.isNull;
4244
import static org.mockito.Mockito.spy;
43-
import static org.mockito.Mockito.times;
4445
import static org.mockito.Mockito.verify;
4546
import static org.mockito.Mockito.when;
4647

@@ -86,7 +87,7 @@ public void setUp() {
8687
@Test
8788
public void testPublish() {
8889
this.adapter.handleMessage(this.message);
89-
verify(this.pubSubTemplate, times(1))
90+
verify(this.pubSubTemplate)
9091
.publish(eq("testTopic"), eq("testPayload".getBytes()), anyMap());
9192
}
9293

@@ -99,7 +100,7 @@ public void testPublishDynamicTopic() {
99100
.put(GcpPubSubHeaders.TOPIC, "dynamicTopic")
100101
.build());
101102
this.adapter.handleMessage(dynamicMessage);
102-
verify(this.pubSubTemplate, times(1))
103+
verify(this.pubSubTemplate)
103104
.publish(eq("dynamicTopic"), eq("testPayload".getBytes()), anyMap());
104105
}
105106

@@ -114,7 +115,7 @@ public void testSendToExpressionTopic() {
114115
.put("sendToTopic", "expressionTopic")
115116
.build());
116117
this.adapter.handleMessage(expressionMessage);
117-
verify(this.pubSubTemplate, times(1))
118+
verify(this.pubSubTemplate)
118119
.publish(eq("expressionTopic"), eq("testPayload".getBytes()), anyMap());
119120
}
120121

@@ -125,7 +126,7 @@ public void testPublishSync() {
125126
this.adapter.setPublishTimeoutExpression(timeout);
126127

127128
this.adapter.handleMessage(this.message);
128-
verify(timeout, times(1)).getValue(isNull(), eq(this.message), eq(Long.class));
129+
verify(timeout).getValue(isNull(), eq(this.message), eq(Long.class));
129130
}
130131

131132
@Test
@@ -148,7 +149,7 @@ public void onSuccess(String result) {
148149

149150
assertThat(this.adapter.getPublishCallback()).isSameAs(callbackSpy);
150151

151-
verify(callbackSpy, times(1)).onSuccess("benfica");
152+
verify(callbackSpy).onSuccess("benfica");
152153
}
153154

154155
@Test
@@ -227,4 +228,18 @@ public void testSetHeaderMapperWithNull() {
227228

228229
this.adapter.setHeaderMapper(null);
229230
}
231+
232+
@Test
233+
public void testPublishWithOrderingKey() {
234+
this.message = new GenericMessage<byte[]>("testPayload".getBytes(),
235+
new MapBuilder<String, Object>()
236+
.put(GcpPubSubHeaders.ORDERING_KEY, "key1")
237+
.build());
238+
239+
this.adapter.handleMessage(this.message);
240+
verify(this.pubSubTemplate)
241+
.publish("testTopic", "testPayload".getBytes(),
242+
Collections.singletonMap(GcpPubSubHeaders.ORDERING_KEY, "key1"));
243+
}
244+
230245
}

0 commit comments

Comments
 (0)