Skip to content

Commit b9af4f7

Browse files
authored
Add support for binder customizers (#186)
`ConsumerEndpointCustomizer<PubSubInboundChannelAdapter>` and `ProducerMessageHandlerCustomizer<PubSubMessageHandler>` user beans can now be provided to customize the Pub/Sub binder. Fixes: #182.
1 parent 2af4c27 commit b9af4f7

File tree

2 files changed

+101
-3
lines changed

2 files changed

+101
-3
lines changed

spring-cloud-gcp-pubsub-stream-binder/src/main/java/com/google/cloud/spring/stream/binder/pubsub/config/PubSubBinderConfiguration.java

+14-3
Original file line numberDiff line numberDiff line change
@@ -20,6 +20,8 @@
2020

2121
import com.google.cloud.spring.pubsub.PubSubAdmin;
2222
import com.google.cloud.spring.pubsub.core.PubSubTemplate;
23+
import com.google.cloud.spring.pubsub.integration.inbound.PubSubInboundChannelAdapter;
24+
import com.google.cloud.spring.pubsub.integration.outbound.PubSubMessageHandler;
2325
import com.google.cloud.spring.stream.binder.pubsub.PubSubMessageChannelBinder;
2426
import com.google.cloud.spring.stream.binder.pubsub.properties.PubSubExtendedBindingProperties;
2527
import com.google.cloud.spring.stream.binder.pubsub.provisioning.PubSubChannelProvisioner;
@@ -29,14 +31,18 @@
2931
import org.springframework.boot.context.properties.source.ConfigurationPropertyName;
3032
import org.springframework.cloud.stream.binder.Binder;
3133
import org.springframework.cloud.stream.config.BindingHandlerAdvise.MappingsProvider;
34+
import org.springframework.cloud.stream.config.ConsumerEndpointCustomizer;
35+
import org.springframework.cloud.stream.config.ProducerMessageHandlerCustomizer;
3236
import org.springframework.context.annotation.Bean;
3337
import org.springframework.context.annotation.Configuration;
38+
import org.springframework.lang.Nullable;
3439

3540
/**
3641
* Pub/Sub binder configuration.
3742
*
3843
* @author João André Martins
3944
* @author Daniel Zou
45+
* @author Mike Eltsufin
4046
*/
4147
@Configuration(proxyBeanMethods = false)
4248
@ConditionalOnMissingBean(Binder.class)
@@ -52,10 +58,15 @@ public PubSubChannelProvisioner pubSubChannelProvisioner(PubSubAdmin pubSubAdmin
5258
public PubSubMessageChannelBinder pubSubBinder(
5359
PubSubChannelProvisioner pubSubChannelProvisioner,
5460
PubSubTemplate pubSubTemplate,
55-
PubSubExtendedBindingProperties pubSubExtendedBindingProperties) {
56-
57-
return new PubSubMessageChannelBinder(null, pubSubChannelProvisioner, pubSubTemplate,
61+
PubSubExtendedBindingProperties pubSubExtendedBindingProperties,
62+
@Nullable ProducerMessageHandlerCustomizer<PubSubMessageHandler> producerCustomizer,
63+
@Nullable ConsumerEndpointCustomizer<PubSubInboundChannelAdapter> consumerCustomizer
64+
) {
65+
PubSubMessageChannelBinder binder = new PubSubMessageChannelBinder(null, pubSubChannelProvisioner, pubSubTemplate,
5866
pubSubExtendedBindingProperties);
67+
binder.setProducerMessageHandlerCustomizer(producerCustomizer);
68+
binder.setConsumerEndpointCustomizer(consumerCustomizer);
69+
return binder;
5970
}
6071

6172
@Bean

spring-cloud-gcp-pubsub-stream-binder/src/test/java/com/google/cloud/spring/stream/binder/pubsub/PubSubMessageChannelBinderTests.java

+87
Original file line numberDiff line numberDiff line change
@@ -16,29 +16,52 @@
1616

1717
package com.google.cloud.spring.stream.binder.pubsub;
1818

19+
import java.util.List;
20+
import java.util.Map;
21+
22+
import com.google.api.gax.core.CredentialsProvider;
23+
import com.google.auth.Credentials;
24+
import com.google.cloud.spring.core.GcpProjectIdProvider;
1925
import com.google.cloud.spring.pubsub.PubSubAdmin;
2026
import com.google.cloud.spring.pubsub.core.PubSubTemplate;
27+
import com.google.cloud.spring.pubsub.integration.inbound.PubSubInboundChannelAdapter;
2128
import com.google.cloud.spring.pubsub.integration.inbound.PubSubMessageSource;
2229
import com.google.cloud.spring.pubsub.integration.outbound.PubSubMessageHandler;
2330
import com.google.cloud.spring.stream.binder.pubsub.config.PubSubBinderConfiguration;
2431
import com.google.cloud.spring.stream.binder.pubsub.properties.PubSubConsumerProperties;
2532
import com.google.cloud.spring.stream.binder.pubsub.properties.PubSubExtendedBindingProperties;
2633
import com.google.cloud.spring.stream.binder.pubsub.provisioning.PubSubChannelProvisioner;
34+
import org.apache.commons.logging.Log;
35+
import org.apache.commons.logging.LogFactory;
2736
import org.junit.Before;
2837
import org.junit.Test;
2938
import org.junit.runner.RunWith;
3039
import org.mockito.Mock;
3140
import org.mockito.junit.MockitoJUnitRunner;
3241

42+
import org.springframework.beans.DirectFieldAccessor;
3343
import org.springframework.boot.autoconfigure.AutoConfigurations;
44+
import org.springframework.boot.autoconfigure.EnableAutoConfiguration;
3445
import org.springframework.boot.test.context.runner.ApplicationContextRunner;
46+
import org.springframework.cloud.stream.annotation.EnableBinding;
47+
import org.springframework.cloud.stream.annotation.Input;
48+
import org.springframework.cloud.stream.annotation.StreamListener;
49+
import org.springframework.cloud.stream.binder.Binding;
3550
import org.springframework.cloud.stream.binder.ExtendedConsumerProperties;
3651
import org.springframework.cloud.stream.binder.ExtendedProducerProperties;
52+
import org.springframework.cloud.stream.binder.PollableMessageSource;
53+
import org.springframework.cloud.stream.binding.BindingService;
54+
import org.springframework.cloud.stream.config.ConsumerEndpointCustomizer;
55+
import org.springframework.cloud.stream.config.ProducerMessageHandlerCustomizer;
56+
import org.springframework.cloud.stream.messaging.Processor;
57+
import org.springframework.cloud.stream.messaging.Sink;
3758
import org.springframework.cloud.stream.provisioning.ConsumerDestination;
3859
import org.springframework.cloud.stream.provisioning.ProducerDestination;
60+
import org.springframework.context.annotation.Bean;
3961
import org.springframework.messaging.MessageChannel;
4062

4163
import static org.assertj.core.api.Assertions.assertThat;
64+
import static org.mockito.Mockito.mock;
4265
import static org.mockito.Mockito.verify;
4366
import static org.mockito.Mockito.when;
4467

@@ -52,6 +75,7 @@
5275
*/
5376
@RunWith(MockitoJUnitRunner.class)
5477
public class PubSubMessageChannelBinderTests {
78+
private static final Log LOGGER = LogFactory.getLog(PubSubMessageChannelBinderTests.class);
5579

5680
PubSubMessageChannelBinder binder;
5781

@@ -148,4 +172,67 @@ public void consumerMaxFetchPropertyPropagatesToMessageSource() {
148172
});
149173
}
150174

175+
@Test
176+
public void testProducerAndConsumerCustomizers() {
177+
baseContext.withUserConfiguration(PubSubBinderTestConfig.class)
178+
.withPropertyValues("spring.cloud.stream.bindings.input.group=testGroup")
179+
.run(context -> {
180+
181+
DirectFieldAccessor channelBindingServiceAccessor = new DirectFieldAccessor(
182+
context.getBean(BindingService.class));
183+
@SuppressWarnings("unchecked")
184+
Map<String, List<Binding<MessageChannel>>> consumerBindings =
185+
(Map<String, List<Binding<MessageChannel>>>) channelBindingServiceAccessor
186+
.getPropertyValue("consumerBindings");
187+
assertThat(new DirectFieldAccessor(
188+
consumerBindings.get("input").get(0)).getPropertyValue(
189+
"lifecycle.beanName"))
190+
.isEqualTo("setByCustomizer:input");
191+
192+
@SuppressWarnings("unchecked")
193+
Map<String, Binding<MessageChannel>> producerBindings =
194+
(Map<String, Binding<MessageChannel>>) channelBindingServiceAccessor
195+
.getPropertyValue("producerBindings");
196+
assertThat(new DirectFieldAccessor(
197+
producerBindings.get("output")).getPropertyValue(
198+
"val$producerMessageHandler.beanName"))
199+
.isEqualTo("setByCustomizer:output");
200+
});
201+
}
202+
203+
public interface PMS {
204+
@Input
205+
PollableMessageSource source();
206+
}
207+
208+
@EnableBinding({ Processor.class, PMS.class })
209+
@EnableAutoConfiguration
210+
public static class PubSubBinderTestConfig {
211+
212+
@Bean
213+
public ConsumerEndpointCustomizer<PubSubInboundChannelAdapter> consumerCustomizer() {
214+
return (p, q, g) -> p.setBeanName("setByCustomizer:" + q);
215+
}
216+
217+
@Bean
218+
public ProducerMessageHandlerCustomizer<PubSubMessageHandler> handlerCustomizer() {
219+
return (handler, destinationName) -> handler.setBeanName("setByCustomizer:" + destinationName);
220+
}
221+
222+
@StreamListener(Sink.INPUT)
223+
public void process(String payload) throws InterruptedException {
224+
LOGGER.info("received: " + payload);
225+
}
226+
227+
@Bean
228+
public GcpProjectIdProvider projectIdProvider() {
229+
return () -> "fake project";
230+
}
231+
232+
@Bean
233+
public CredentialsProvider googleCredentials() {
234+
return () -> mock(Credentials.class);
235+
}
236+
237+
}
151238
}

0 commit comments

Comments
 (0)