diff --git a/spring-cloud-gcp-autoconfigure/src/main/java/org/springframework/cloud/gcp/autoconfigure/pubsub/GcpPubSubAutoConfiguration.java b/spring-cloud-gcp-autoconfigure/src/main/java/org/springframework/cloud/gcp/autoconfigure/pubsub/GcpPubSubAutoConfiguration.java index cbedd30b13..9824463dda 100644 --- a/spring-cloud-gcp-autoconfigure/src/main/java/org/springframework/cloud/gcp/autoconfigure/pubsub/GcpPubSubAutoConfiguration.java +++ b/spring-cloud-gcp-autoconfigure/src/main/java/org/springframework/cloud/gcp/autoconfigure/pubsub/GcpPubSubAutoConfiguration.java @@ -18,7 +18,6 @@ import java.io.IOException; import java.util.concurrent.Executor; -import java.util.concurrent.Executors; import java.util.function.Consumer; import java.util.function.Function; @@ -64,6 +63,7 @@ import org.springframework.context.annotation.Bean; import org.springframework.context.annotation.Configuration; import org.springframework.scheduling.concurrent.ThreadPoolTaskExecutor; +import org.springframework.scheduling.concurrent.ThreadPoolTaskScheduler; /** * Auto-config for Pub/Sub. @@ -110,18 +110,36 @@ public GcpPubSubAutoConfiguration(GcpPubSubProperties gcpPubSubProperties, } } + @Bean + @ConditionalOnMissingBean(name = "pubsubPublisherThreadPool") + public ThreadPoolTaskScheduler pubsubPublisherThreadPool() { + ThreadPoolTaskScheduler scheduler = new ThreadPoolTaskScheduler(); + scheduler.setPoolSize(this.gcpPubSubProperties.getPublisher().getExecutorThreads()); + scheduler.setThreadNamePrefix("gcp-pubsub-publisher"); + return scheduler; + } + @Bean @ConditionalOnMissingBean(name = "publisherExecutorProvider") - public ExecutorProvider publisherExecutorProvider() { - return FixedExecutorProvider.create(Executors.newScheduledThreadPool( - this.gcpPubSubProperties.getPublisher().getExecutorThreads())); + public ExecutorProvider publisherExecutorProvider( + @Qualifier("pubsubPublisherThreadPool") ThreadPoolTaskScheduler scheduler) { + return FixedExecutorProvider.create(scheduler.getScheduledExecutor()); + } + + @Bean + @ConditionalOnMissingBean(name = "pubsubSubscriberThreadPool") + public ThreadPoolTaskScheduler pubsubSubscriberThreadPool() { + ThreadPoolTaskScheduler scheduler = new ThreadPoolTaskScheduler(); + scheduler.setPoolSize(this.gcpPubSubProperties.getSubscriber().getExecutorThreads()); + scheduler.setThreadNamePrefix("gcp-pubsub-subscriber"); + return scheduler; } @Bean @ConditionalOnMissingBean(name = "subscriberExecutorProvider") - public ExecutorProvider subscriberExecutorProvider() { - return FixedExecutorProvider.create(Executors.newScheduledThreadPool( - this.gcpPubSubProperties.getSubscriber().getExecutorThreads())); + public ExecutorProvider subscriberExecutorProvider( + @Qualifier("pubsubSubscriberThreadPool") ThreadPoolTaskScheduler scheduler) { + return FixedExecutorProvider.create(scheduler.getScheduledExecutor()); } @Bean @@ -138,6 +156,7 @@ public PubSubPublisherTemplate pubSubPublisherTemplate(PublisherFactory publishe public Executor pubSubAcknowledgementExecutor() { ThreadPoolTaskExecutor ackExecutor = new ThreadPoolTaskExecutor(); ackExecutor.setMaxPoolSize(this.gcpPubSubProperties.getSubscriber().getMaxAcknowledgementThreads()); + ackExecutor.setThreadNamePrefix("gcp-pubsub-ack-executor"); return ackExecutor; }