Skip to content

Commit d3c3c7f

Browse files
committed
propagate cancellation
1 parent 011258a commit d3c3c7f

File tree

1 file changed

+2
-0
lines changed

1 file changed

+2
-0
lines changed

spring-cloud-gcp-pubsub/src/main/java/com/google/cloud/spring/pubsub/reactive/PubSubReactiveFactory.java

+2
Original file line numberDiff line numberDiff line change
@@ -129,6 +129,7 @@ private Flux<List<AcknowledgeablePubsubMessage>> backpressurePull(
129129
int intDemand = numRequested > Integer.MAX_VALUE ? Integer.MAX_VALUE : (int) numRequested;
130130
var future = this.subscriberOperations.pullAsync(subscriptionName, intDemand, false);
131131
return Mono.fromFuture(future)
132+
.publishOn(scheduler)
132133
.flatMapMany(messages -> {
133134
long numToPull = numRequested - messages.size();
134135
if (numToPull > 0) {
@@ -159,6 +160,7 @@ private void transferMessages(Flux<List<AcknowledgeablePubsubMessage>> source, F
159160
}
160161
if (destination.isCancelled()) {
161162
messages.forEach(msg -> msg.modifyAckDeadline(0));
163+
source.cancelOn(scheduler);
162164
}
163165
}).doOnError(destination::error).subscribe());
164166
}

0 commit comments

Comments
 (0)