Skip to content

Commit 191f693

Browse files
authored
Fix memory leak in the FluxMessageChannel (#8622)
The `FluxMessageChannel` can subscribe to any volatile `Publisher`. For example, we can call Reactor Kafka `Sender.send()` for input data and pass its result to the `FluxMessageChannel` for on demand subscription. These publishers are subscribed in the `FluxMessageChannel` and their `Disposable` is stored in the internal `Disposable.Composite` which currently only cleared on `destroy()` * Extract `Disposable` from those internal `subscribe()` calls into an `AtomicReference`. * Use this `AtomicReference` in the `doOnTerminate()` to remove from the `Disposable.Composite` and `dispose()` when such a volatile `Publisher` is completed **Cherry-pick to `6.0.x` & `5.5.x`**
1 parent 3c0927e commit 191f693

File tree

2 files changed

+53
-6
lines changed

2 files changed

+53
-6
lines changed

spring-integration-core/src/main/java/org/springframework/integration/channel/FluxMessageChannel.java

+32-6
Original file line numberDiff line numberDiff line change
@@ -18,6 +18,7 @@
1818

1919
import java.time.Duration;
2020
import java.util.concurrent.TimeUnit;
21+
import java.util.concurrent.atomic.AtomicReference;
2122
import java.util.concurrent.locks.LockSupport;
2223

2324
import org.reactivestreams.Publisher;
@@ -100,27 +101,52 @@ public void subscribe(Subscriber<? super Message<?>> subscriber) {
100101
.share()
101102
.subscribe(subscriber);
102103

103-
this.upstreamSubscriptions.add(
104+
Mono<Boolean> subscribersBarrier =
104105
Mono.fromCallable(() -> this.sink.currentSubscriberCount() > 0)
105106
.filter(Boolean::booleanValue)
106107
.doOnNext(this.subscribedSignal::tryEmitNext)
107108
.repeatWhenEmpty((repeat) ->
108-
this.active ? repeat.delayElements(Duration.ofMillis(100)) : repeat) // NOSONAR
109-
.subscribe());
109+
this.active ? repeat.delayElements(Duration.ofMillis(100)) : repeat); // NOSONAR
110+
111+
addPublisherToSubscribe(Flux.from(subscribersBarrier));
112+
}
113+
114+
private void addPublisherToSubscribe(Flux<?> publisher) {
115+
AtomicReference<Disposable> disposableReference = new AtomicReference<>();
116+
117+
Disposable disposable =
118+
publisher
119+
.doOnTerminate(() -> disposeUpstreamSubscription(disposableReference))
120+
.subscribe();
121+
122+
if (!disposable.isDisposed()) {
123+
if (this.upstreamSubscriptions.add(disposable)) {
124+
disposableReference.set(disposable);
125+
}
126+
}
127+
}
128+
129+
private void disposeUpstreamSubscription(AtomicReference<Disposable> disposableReference) {
130+
Disposable disposable = disposableReference.get();
131+
if (disposable != null) {
132+
this.upstreamSubscriptions.remove(disposable);
133+
disposable.dispose();
134+
}
110135
}
111136

112137
@Override
113138
public void subscribeTo(Publisher<? extends Message<?>> publisher) {
114-
this.upstreamSubscriptions.add(
139+
Flux<Object> upstreamPublisher =
115140
Flux.from(publisher)
116141
.delaySubscription(this.subscribedSignal.asFlux().filter(Boolean::booleanValue).next())
117142
.publishOn(this.scheduler)
118143
.flatMap((message) ->
119144
Mono.just(message)
120145
.handle((messageToHandle, syncSink) -> sendReactiveMessage(messageToHandle))
121146
.contextWrite(StaticMessageHeaderAccessor.getReactorContext(message)))
122-
.contextCapture()
123-
.subscribe());
147+
.contextCapture();
148+
149+
addPublisherToSubscribe(upstreamPublisher);
124150
}
125151

126152
private void sendReactiveMessage(Message<?> message) {

spring-integration-core/src/test/java/org/springframework/integration/channel/reactive/FluxMessageChannelTests.java

+21
Original file line numberDiff line numberDiff line change
@@ -26,6 +26,8 @@
2626
import org.junit.jupiter.api.Test;
2727
import reactor.core.Disposable;
2828
import reactor.core.publisher.Flux;
29+
import reactor.core.publisher.Mono;
30+
import reactor.test.StepVerifier;
2931

3032
import org.springframework.beans.factory.annotation.Autowired;
3133
import org.springframework.context.annotation.Bean;
@@ -144,6 +146,25 @@ void testFluxMessageChannelCleanUp() throws InterruptedException {
144146
.until(() -> TestUtils.getPropertyValue(flux, "sink.sink.done", Boolean.class));
145147
}
146148

149+
@Test
150+
void noMemoryLeakInFluxMessageChannelForVolatilePublishers() {
151+
FluxMessageChannel messageChannel = new FluxMessageChannel();
152+
153+
StepVerifier stepVerifier = StepVerifier.create(messageChannel)
154+
.expectNextCount(3)
155+
.thenCancel()
156+
.verifyLater();
157+
158+
messageChannel.subscribeTo(Mono.just(new GenericMessage<>("test")));
159+
messageChannel.subscribeTo(Flux.just("test1", "test2").map(GenericMessage::new));
160+
161+
stepVerifier.verify();
162+
163+
Disposable.Composite upstreamSubscriptions =
164+
TestUtils.getPropertyValue(messageChannel, "upstreamSubscriptions", Disposable.Composite.class);
165+
assertThat(upstreamSubscriptions.size()).isEqualTo(0);
166+
}
167+
147168
@Configuration
148169
@EnableIntegration
149170
public static class TestConfiguration {

0 commit comments

Comments
 (0)