Skip to content

Commit 322cbca

Browse files
committed
Support for async/reactive close methods (e.g. R2DBC)
Closes gh-26991
1 parent 2685a35 commit 322cbca

File tree

4 files changed

+230
-18
lines changed

4 files changed

+230
-18
lines changed

spring-beans/spring-beans.gradle

+1
Original file line numberDiff line numberDiff line change
@@ -9,6 +9,7 @@ dependencies {
99
optional("org.apache.groovy:groovy-xml")
1010
optional("org.jetbrains.kotlin:kotlin-reflect")
1111
optional("org.jetbrains.kotlin:kotlin-stdlib")
12+
optional("org.reactivestreams:reactive-streams")
1213
testImplementation(testFixtures(project(":spring-core")))
1314
testImplementation(project(":spring-core-test"))
1415
testImplementation("jakarta.annotation:jakarta.annotation-api")

spring-beans/src/main/java/org/springframework/beans/factory/support/DisposableBeanAdapter.java

+112-17
Original file line numberDiff line numberDiff line change
@@ -21,13 +21,20 @@
2121
import java.lang.reflect.Method;
2222
import java.util.ArrayList;
2323
import java.util.List;
24+
import java.util.concurrent.CountDownLatch;
25+
import java.util.concurrent.ExecutionException;
26+
import java.util.concurrent.Future;
2427

2528
import org.apache.commons.logging.Log;
2629
import org.apache.commons.logging.LogFactory;
30+
import org.reactivestreams.Subscriber;
31+
import org.reactivestreams.Subscription;
2732

2833
import org.springframework.beans.BeanUtils;
2934
import org.springframework.beans.factory.DisposableBean;
3035
import org.springframework.beans.factory.config.DestructionAwareBeanPostProcessor;
36+
import org.springframework.core.ReactiveAdapter;
37+
import org.springframework.core.ReactiveAdapterRegistry;
3138
import org.springframework.lang.Nullable;
3239
import org.springframework.util.Assert;
3340
import org.springframework.util.ClassUtils;
@@ -65,8 +72,12 @@ class DisposableBeanAdapter implements DisposableBean, Runnable, Serializable {
6572

6673
private static final String SHUTDOWN_METHOD_NAME = "shutdown";
6774

75+
6876
private static final Log logger = LogFactory.getLog(DisposableBeanAdapter.class);
6977

78+
private static final boolean reactiveStreamsPresent = ClassUtils.isPresent(
79+
"org.reactivestreams.Publisher", DisposableBeanAdapter.class.getClassLoader());
80+
7081

7182
private final Object bean;
7283

@@ -240,7 +251,7 @@ else if (this.destroyMethods != null) {
240251
}
241252
}
242253
else if (this.destroyMethodNames != null) {
243-
for (String destroyMethodName: this.destroyMethodNames) {
254+
for (String destroyMethodName : this.destroyMethodNames) {
244255
Method destroyMethod = determineDestroyMethod(destroyMethodName);
245256
if (destroyMethod != null) {
246257
invokeCustomDestroyMethod(
@@ -287,32 +298,40 @@ private Method findDestroyMethod(Class<?> clazz, String name) {
287298
* assuming a "force" parameter), else logging an error.
288299
*/
289300
private void invokeCustomDestroyMethod(Method destroyMethod) {
301+
if (logger.isTraceEnabled()) {
302+
logger.trace("Invoking custom destroy method '" + destroyMethod.getName() +
303+
"' on bean with name '" + this.beanName + "': " + destroyMethod);
304+
}
305+
290306
int paramCount = destroyMethod.getParameterCount();
291-
final Object[] args = new Object[paramCount];
307+
Object[] args = new Object[paramCount];
292308
if (paramCount == 1) {
293309
args[0] = Boolean.TRUE;
294310
}
295-
if (logger.isTraceEnabled()) {
296-
logger.trace("Invoking custom destroy method '" + destroyMethod.getName() +
297-
"' on bean with name '" + this.beanName + "'");
298-
}
311+
299312
try {
300313
ReflectionUtils.makeAccessible(destroyMethod);
301-
destroyMethod.invoke(this.bean, args);
302-
}
303-
catch (InvocationTargetException ex) {
304-
if (logger.isWarnEnabled()) {
305-
String msg = "Custom destroy method '" + destroyMethod.getName() + "' on bean with name '" +
306-
this.beanName + "' threw an exception";
314+
Object returnValue = destroyMethod.invoke(this.bean, args);
315+
316+
if (returnValue == null) {
317+
// Regular case: a void method
318+
logDestroyMethodCompletion(destroyMethod, false);
319+
}
320+
else if (returnValue instanceof Future<?> future) {
321+
// An async task: await its completion.
322+
future.get();
323+
logDestroyMethodCompletion(destroyMethod, true);
324+
}
325+
else if (!reactiveStreamsPresent || !new ReactiveDestroyMethodHandler().await(destroyMethod, returnValue)) {
307326
if (logger.isDebugEnabled()) {
308-
// Log at warn level like below but add the exception stacktrace only with debug level
309-
logger.warn(msg, ex.getTargetException());
310-
}
311-
else {
312-
logger.warn(msg + ": " + ex.getTargetException());
327+
logger.debug("Unknown return value type from custom destroy method '" + destroyMethod.getName() +
328+
"' on bean with name '" + this.beanName + "': " + returnValue.getClass());
313329
}
314330
}
315331
}
332+
catch (InvocationTargetException | ExecutionException ex) {
333+
logDestroyMethodException(destroyMethod, ex.getCause());
334+
}
316335
catch (Throwable ex) {
317336
if (logger.isWarnEnabled()) {
318337
logger.warn("Failed to invoke custom destroy method '" + destroyMethod.getName() +
@@ -321,6 +340,27 @@ private void invokeCustomDestroyMethod(Method destroyMethod) {
321340
}
322341
}
323342

343+
void logDestroyMethodException(Method destroyMethod, Throwable ex) {
344+
if (logger.isWarnEnabled()) {
345+
String msg = "Custom destroy method '" + destroyMethod.getName() + "' on bean with name '" +
346+
this.beanName + "' propagated an exception";
347+
if (logger.isDebugEnabled()) {
348+
// Log at warn level like below but add the exception stacktrace only with debug level
349+
logger.warn(msg, ex);
350+
}
351+
else {
352+
logger.warn(msg + ": " + ex);
353+
}
354+
}
355+
}
356+
357+
void logDestroyMethodCompletion(Method destroyMethod, boolean async) {
358+
if (logger.isDebugEnabled()) {
359+
logger.debug("Custom destroy method '" + destroyMethod.getName() +
360+
"' on bean with name '" + this.beanName + "' completed" + (async ? " asynchronously" : ""));
361+
}
362+
}
363+
324364

325365
/**
326366
* Serializes a copy of the state of this class,
@@ -443,4 +483,59 @@ private static List<DestructionAwareBeanPostProcessor> filterPostProcessors(
443483
return filteredPostProcessors;
444484
}
445485

486+
487+
/**
488+
* Inner class to avoid a hard dependency on the Reactive Streams API at runtime.
489+
*/
490+
private class ReactiveDestroyMethodHandler {
491+
492+
public boolean await(Method destroyMethod, Object returnValue) throws InterruptedException {
493+
ReactiveAdapter adapter = ReactiveAdapterRegistry.getSharedInstance().getAdapter(returnValue.getClass());
494+
if (adapter != null) {
495+
CountDownLatch latch = new CountDownLatch(1);
496+
adapter.toPublisher(returnValue).subscribe(new DestroyMethodSubscriber(destroyMethod, latch));
497+
latch.await();
498+
return true;
499+
}
500+
return false;
501+
}
502+
}
503+
504+
505+
/**
506+
* Reactive Streams Subscriber for destroy method completion.
507+
*/
508+
private class DestroyMethodSubscriber implements Subscriber<Object> {
509+
510+
private final Method destroyMethod;
511+
512+
private final CountDownLatch latch;
513+
514+
public DestroyMethodSubscriber(Method destroyMethod, CountDownLatch latch) {
515+
this.destroyMethod = destroyMethod;
516+
this.latch = latch;
517+
}
518+
519+
@Override
520+
public void onSubscribe(Subscription s) {
521+
s.request(Integer.MAX_VALUE);
522+
}
523+
524+
@Override
525+
public void onNext(Object o) {
526+
}
527+
528+
@Override
529+
public void onError(Throwable t) {
530+
this.latch.countDown();
531+
logDestroyMethodException(this.destroyMethod, t);
532+
}
533+
534+
@Override
535+
public void onComplete() {
536+
this.latch.countDown();
537+
logDestroyMethodCompletion(this.destroyMethod, true);
538+
}
539+
}
540+
446541
}

spring-context/src/test/java/org/springframework/context/annotation/DestroyMethodInferenceTests.java

+53-1
Original file line numberDiff line numberDiff line change
@@ -1,5 +1,5 @@
11
/*
2-
* Copyright 2002-2021 the original author or authors.
2+
* Copyright 2002-2023 the original author or authors.
33
*
44
* Licensed under the Apache License, Version 2.0 (the "License");
55
* you may not use this file except in compliance with the License.
@@ -17,8 +17,10 @@
1717
package org.springframework.context.annotation;
1818

1919
import java.io.Closeable;
20+
import java.util.concurrent.CompletableFuture;
2021

2122
import org.junit.jupiter.api.Test;
23+
import reactor.core.publisher.Mono;
2224

2325
import org.springframework.beans.factory.DisposableBean;
2426
import org.springframework.context.ConfigurableApplicationContext;
@@ -47,6 +49,8 @@ public void beanMethods() {
4749
WithInheritedCloseMethod c8 = ctx.getBean("c8", WithInheritedCloseMethod.class);
4850
WithDisposableBean c9 = ctx.getBean("c9", WithDisposableBean.class);
4951
WithAutoCloseable c10 = ctx.getBean("c10", WithAutoCloseable.class);
52+
WithCompletableFutureMethod c11 = ctx.getBean("c11", WithCompletableFutureMethod.class);
53+
WithReactorMonoMethod c12 = ctx.getBean("c12", WithReactorMonoMethod.class);
5054

5155
assertThat(c0.closed).as("c0").isFalse();
5256
assertThat(c1.closed).as("c1").isFalse();
@@ -59,6 +63,8 @@ public void beanMethods() {
5963
assertThat(c8.closed).as("c8").isFalse();
6064
assertThat(c9.closed).as("c9").isFalse();
6165
assertThat(c10.closed).as("c10").isFalse();
66+
assertThat(c11.closed).as("c11").isFalse();
67+
assertThat(c12.closed).as("c12").isFalse();
6268

6369
ctx.close();
6470
assertThat(c0.closed).as("c0").isTrue();
@@ -72,6 +78,8 @@ public void beanMethods() {
7278
assertThat(c8.closed).as("c8").isFalse();
7379
assertThat(c9.closed).as("c9").isTrue();
7480
assertThat(c10.closed).as("c10").isTrue();
81+
assertThat(c11.closed).as("c11").isTrue();
82+
assertThat(c12.closed).as("c12").isTrue();
7583
}
7684

7785
@Test
@@ -171,6 +179,16 @@ public WithDisposableBean c9() {
171179
public WithAutoCloseable c10() {
172180
return new WithAutoCloseable();
173181
}
182+
183+
@Bean
184+
public WithCompletableFutureMethod c11() {
185+
return new WithCompletableFutureMethod();
186+
}
187+
188+
@Bean
189+
public WithReactorMonoMethod c12() {
190+
return new WithReactorMonoMethod();
191+
}
174192
}
175193

176194

@@ -242,4 +260,38 @@ public void close() {
242260
}
243261
}
244262

263+
264+
static class WithCompletableFutureMethod {
265+
266+
boolean closed = false;
267+
268+
public CompletableFuture<Void> close() {
269+
return CompletableFuture.runAsync(() -> {
270+
try {
271+
Thread.sleep(100);
272+
}
273+
catch (InterruptedException e) {
274+
Thread.currentThread().interrupt();
275+
}
276+
closed = true;
277+
});
278+
}
279+
}
280+
281+
282+
static class WithReactorMonoMethod {
283+
284+
boolean closed = false;
285+
286+
public Mono<Void> close() {
287+
try {
288+
Thread.sleep(100);
289+
}
290+
catch (InterruptedException e) {
291+
Thread.currentThread().interrupt();
292+
}
293+
return Mono.fromRunnable(() -> closed = true);
294+
}
295+
}
296+
245297
}
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,64 @@
1+
/*
2+
* Copyright 2002-2023 the original author or authors.
3+
*
4+
* Licensed under the Apache License, Version 2.0 (the "License");
5+
* you may not use this file except in compliance with the License.
6+
* You may obtain a copy of the License at
7+
*
8+
* https://www.apache.org/licenses/LICENSE-2.0
9+
*
10+
* Unless required by applicable law or agreed to in writing, software
11+
* distributed under the License is distributed on an "AS IS" BASIS,
12+
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
13+
* See the License for the specific language governing permissions and
14+
* limitations under the License.
15+
*/
16+
17+
package org.springframework.r2dbc.core;
18+
19+
import io.r2dbc.h2.CloseableConnectionFactory;
20+
import io.r2dbc.h2.H2ConnectionFactory;
21+
import io.r2dbc.spi.ConnectionFactory;
22+
import io.r2dbc.spi.R2dbcNonTransientResourceException;
23+
import org.junit.jupiter.api.AfterEach;
24+
25+
import org.springframework.context.annotation.AnnotationConfigApplicationContext;
26+
import org.springframework.context.annotation.Bean;
27+
import org.springframework.context.annotation.Configuration;
28+
29+
import static org.assertj.core.api.Assertions.assertThatExceptionOfType;
30+
31+
/**
32+
* @author Juergen Hoeller
33+
* @since 6.1
34+
*/
35+
public class H2DatabaseClientContextIntegrationTests extends H2DatabaseClientIntegrationTests {
36+
37+
AnnotationConfigApplicationContext context = new AnnotationConfigApplicationContext(Config.class);
38+
39+
CloseableConnectionFactory connectionFactory = context.getBean(CloseableConnectionFactory.class);
40+
41+
42+
@Override
43+
protected ConnectionFactory createConnectionFactory() {
44+
return connectionFactory;
45+
}
46+
47+
@AfterEach
48+
public void tearDown() {
49+
context.close();
50+
assertThatExceptionOfType(R2dbcNonTransientResourceException.class).isThrownBy(
51+
() -> connectionFactory.create().block());
52+
}
53+
54+
55+
@Configuration
56+
static class Config {
57+
58+
@Bean
59+
ConnectionFactory connectionFactory() {
60+
return H2ConnectionFactory.inMemory("r2dbc-context");
61+
}
62+
}
63+
64+
}

0 commit comments

Comments
 (0)