-
Notifications
You must be signed in to change notification settings - Fork 690
Making Pub/Sub reactive pull non-blocking #2227
Making Pub/Sub reactive pull non-blocking #2227
Conversation
…nd its implementation, allowing for asynchronous and non-blocking pull of messages
…ullFuture method of the PubSubReactiveFactory, making it non-blocking
@mzeijen Please sign the Contributor License Agreement! Click here to manually synchronize the status of this Pull Request. See the FAQ for frequently asked questions. |
@mzeijen Thank you for signing the Contributor License Agreement! |
Codecov Report
@@ Coverage Diff @@
## master #2227 +/- ##
============================================
- Coverage 80.77% 72.57% -8.20%
+ Complexity 2256 2030 -226
============================================
Files 258 258
Lines 7370 7398 +28
Branches 757 753 -4
============================================
- Hits 5953 5369 -584
- Misses 1103 1684 +581
- Partials 314 345 +31
Continue to review full report at Codecov.
|
Yes, please file a separate issue for the error propagation bug, sorry about that! The naming convention could be As far as Guava, Spring projects try to stay away from using it due to having had bad experiences with versioning/api evolution. Now that Google libraries come with a BOM, this should be less of an issue, though, so perhaps we can consider it. |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
@mzeijen Thanks for the contribution and a detailed description!
Like Elena, said I think it would be a useful addition to the PubSubTemplate
. However, the error-handling issues in the current code should probably be dealt with separately.
Regarding executors, we should probably continue the pattern of it being configurable.
As far as the impact on PubSubReactiveFactory
, I hope @elefeint could take a first stab at it since she's most familiar with that code.
@elefeint @meltsufin Great that you like it. I will finish this contribution than with the suggested changes. I'll create a separate issue for the error propagation bug then. I'll probably do a merge request for it. Regarding Guava and the Regarding the configurable executor: what should the default executor be in that case? Could it still be the |
The bug report for the exception handling is #2229. |
…g-pull # Conflicts: # spring-cloud-gcp-pubsub/src/main/java/org/springframework/cloud/gcp/pubsub/reactive/PubSubReactiveFactory.java # spring-cloud-gcp-pubsub/src/test/java/org/springframework/cloud/gcp/pubsub/reactive/PubSubReactiveFactoryTests.java
…perations.pullAsync`
@mzeijen Thanks for the error-handling fix. Regarding executors, for the default, we can definitely use |
That is a great tip to use the |
…to its new setter and the class. Also added some javadoc to other setter methods that had no javadoc yet.
…added tests for PubSubSubscriberOperations.pullNext, because they where missing.
…a autoconfiguration
I believe the pull request is ready now for the thorough review and for merging if it is ok as it is now. What I did is:
|
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Thank you for the clean code and taking the time to write extra tests that we were missing.
I have two questions I'd like to hash out: which scheduler to use to kick off pollingPull
, and whether callback "recursion" will cause problems in the future.
The rest of my comments are minor.
...a/org/springframework/cloud/gcp/autoconfigure/pubsub/GcpPubSubReactiveAutoConfiguration.java
Show resolved
Hide resolved
...main/java/org/springframework/cloud/gcp/pubsub/core/subscriber/PubSubSubscriberTemplate.java
Outdated
Show resolved
Hide resolved
...main/java/org/springframework/cloud/gcp/pubsub/core/subscriber/PubSubSubscriberTemplate.java
Outdated
Show resolved
Hide resolved
...main/java/org/springframework/cloud/gcp/pubsub/core/subscriber/PubSubSubscriberTemplate.java
Show resolved
Hide resolved
...main/java/org/springframework/cloud/gcp/pubsub/core/subscriber/PubSubSubscriberTemplate.java
Outdated
Show resolved
Hide resolved
...main/java/org/springframework/cloud/gcp/autoconfigure/pubsub/GcpPubSubAutoConfiguration.java
Outdated
Show resolved
Hide resolved
...java/org/springframework/cloud/gcp/pubsub/core/subscriber/PubSubSubscriberTemplateTests.java
Outdated
Show resolved
Hide resolved
...ubsub/src/main/java/org/springframework/cloud/gcp/pubsub/reactive/PubSubReactiveFactory.java
Show resolved
Hide resolved
…d/gcp/pubsub/core/subscriber/PubSubSubscriberTemplate.java Co-Authored-By: Elena Felder <[email protected]>
…d/gcp/pubsub/core/subscriber/PubSubSubscriberTemplate.java Co-Authored-By: Elena Felder <[email protected]>
…d/gcp/pubsub/core/subscriber/PubSubSubscriberTemplate.java Co-Authored-By: Elena Felder <[email protected]>
…st`, as itself can get the project id
…a ObjectProvider<Executor> for the pubSubSubscriberTemplate bean method
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Thanks again for this work and being patient with us!
I added some additional comments.
...main/java/org/springframework/cloud/gcp/autoconfigure/pubsub/GcpPubSubAutoConfiguration.java
Show resolved
Hide resolved
...a/org/springframework/cloud/gcp/autoconfigure/pubsub/GcpPubSubReactiveAutoConfiguration.java
Show resolved
Hide resolved
...main/java/org/springframework/cloud/gcp/pubsub/core/subscriber/PubSubSubscriberTemplate.java
Show resolved
Hide resolved
…` by moving the validation into `SubscriberFactory.createPullRequest` (where it partially already was)
...sub/src/main/java/org/springframework/cloud/gcp/pubsub/support/DefaultSubscriberFactory.java
Show resolved
Hide resolved
...ubsub/src/main/java/org/springframework/cloud/gcp/pubsub/reactive/PubSubReactiveFactory.java
Show resolved
Hide resolved
...gcp-pubsub/src/main/java/org/springframework/cloud/gcp/pubsub/support/SubscriberFactory.java
Outdated
Show resolved
Hide resolved
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
The PR looks good. Two minor things:
- Replace
@since 1.3
with@since 1.2.3
(this is unintuitive -- we are staying on the 1.2.x version until we move this project fromspring-cloud
organization toGoogleCloudPlatform
later in the year. The next major spring cloud release will be after we move orgs, so our master is 1.2.3.BUILD-SNAPSHOT right now). - Since you'll be changing the
@since
, add your name in@author
in all the modified classes.
…self as author to all files where I hadn't added myself yet.
I added myself as author to all classes I changed, correct the @SInCE and made sure all copyright years are correctly set now. Should be good now. |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Thank you very much!
Awesome that we got this feature merged! My colleagues and I really appreciate this. Also kudos for the way you handle these kinds of contributions. Although it took a while, mostly probably due to the time difference, it was a very pleasant process. I really appreciate the positive attitude with the constructive feedback. This makes me want to do more contributions :). Thanks! |
Thank you -- this was really open source at its best; you've invested a lot of time but now all reactive Pub/Sub users will benefit from the lower overhead. This change will make it into the next Spring Cloud patch release, currently scheduled for mid-May. If you or your team would like to suggest features or discuss your use-cases for any GCP/Spring integrations (or Spanner/Hibernate or Spanner/R2DBC), we can set up a video conference. I'll ping the team e-mail over gitter. |
@elefeint Great! I will contact you if we want to take you up for your offer. |
Please note that this pull request is a proposal and it is not ready yet to be merged.
The current
PubSubReactiveFactory
is based on blocking calls toPubSubTemplate
, to be able to reactively pull messages from Pub/Sub. This isn't as efficient as it could be. Because, the Google Pub/Sub client does offer the ability to do non-blocking asynchronous pulling off messages. However, the current PubSubTemplate doesn't offer any pull methods which use this ability and thus thePubSubReactiveFactory
can't make use of it.In this draft pull request I propose changes that makes the
PubSubReactiveFactory.poll(...)
non-blocking, without breaking its backwards compatibility (except for the exception handling, see note below). I make this possible by adding aPubSubTemplate.pullFuture
method, which returns aListenableFuture
instead of blocking while it requests the messages.I did some testing with this code against the Pub/Sub service, processing millions of messages, and comparing it to the
blocking
variant. It doesn't seem to have any performance downside. It does reduces the number of threads that need to be used (as there is one less thread pool that comes in to play), and doesn't do any blocking of threads.If you think this change makes sense, then I will be happy to finish the work so that it is ready to be merged. If you have any concerns then I will happy to discuss them.
The following things are currently missing or may need to change, before this pull request is ready:
pullFuture
. It is not in line with the other methods, but I need a different name forpull
as it gives a different return type. Thefuture
in the name does clearly state what you will get...pull
related methods inPubSubTemplate
for which a version should be offered that return aListenableFuture
and that don't block.PubSubTemplate.pullFuture
theApiFutures.addCallback
call now uses theMoreExecutors.directExecutor()
executor. Because if it, the callback will be handled by the same thread that handles the response from the PubSub client. I am wondering if this is ok or that we should make this configurable like it is already configurable for theack
,nack
andmodifyAckDeadline
methods. I don't see much benefit in having another thread pool that must be managed by default, though. If it needs to go into a different thread pool then it would be great if we could use something generic, for CPU bound processing, like the Reactorparallel
thread pool. Something to discuss.Note: In the current implementation of the PubSubReactiveFactory, what I can see and what I have tested, the implementation doesn't correctly deal with exceptions being thrown by the
PubSubTemplate
or the Google PubSub client. The exceptions aren't passed upstream to the sink. Also, when doing a backpressure based poll, an exception will result in a "dead" flux, as any exception in the BlockingLimitedDemandPullTask will kill the task and no messages will ever be put in the sink anymore. The Flux doesn't know that it is dead and will simply keep on waiting for messages that will never arrive. This is because the BlockingLimitedDemandPullTask doesn't catch and forward the exception to the sink.In the implementation of this pull request, we forward all exceptions to the sink. A
retry
can then be used on theFlux
, returned bypoll
, to make sure exceptions don't result in a dead stream.The issue in the current implementation
PubSubReactiveFactory
should probably be solved independent of this pull request, if there is a chance that this pull request will take a long time or will never be merged. I can create a ticket for it and a separate pull request, if desired.