Skip to content

Commit 4d0a23c

Browse files
committed
spring-projectsGH-8609: Fix bug that cannot applied correctly TcpConnectorInterceptor
Fixes spring-projects#8609 * add getLastWrapper() in TcpConnectionSupport * change to passed the instance that geted via getLastWrapper() instead of self instance to TcpSender#addNewConnection
1 parent 59e676a commit 4d0a23c

File tree

2 files changed

+31
-5
lines changed

2 files changed

+31
-5
lines changed

spring-integration-ip/src/main/java/org/springframework/integration/ip/tcp/connection/TcpConnectionSupport.java

+17-3
Original file line numberDiff line numberDiff line change
@@ -1,5 +1,5 @@
11
/*
2-
* Copyright 2001-2022 the original author or authors.
2+
* Copyright 2001-2023 the original author or authors.
33
*
44
* Licensed under the Apache License, Version 2.0 (the "License");
55
* you may not use this file except in compliance with the License.
@@ -22,6 +22,7 @@
2222
import java.util.Collections;
2323
import java.util.List;
2424
import java.util.Map;
25+
import java.util.Optional;
2526
import java.util.UUID;
2627
import java.util.concurrent.CountDownLatch;
2728
import java.util.concurrent.TimeUnit;
@@ -48,6 +49,7 @@
4849
*
4950
* @author Gary Russell
5051
* @author Artem Bilan
52+
* @author Kazuki Shimizu
5153
*
5254
* @since 2.0
5355
*
@@ -313,7 +315,7 @@ public void enableManualListenerRegistration() {
313315
public void registerSender(@Nullable TcpSender senderToRegister) {
314316
if (senderToRegister != null) {
315317
this.senders.add(senderToRegister);
316-
senderToRegister.addNewConnection(this);
318+
senderToRegister.addNewConnection(getLastWrapper());
317319
}
318320
}
319321

@@ -327,7 +329,7 @@ public void registerSender(@Nullable TcpSender senderToRegister) {
327329
public void registerSenders(List<TcpSender> sendersToRegister) {
328330
this.senders.addAll(sendersToRegister);
329331
for (TcpSender sender : sendersToRegister) {
330-
sender.addNewConnection(this);
332+
sender.addNewConnection(getLastWrapper());
331333
}
332334
}
333335

@@ -433,6 +435,18 @@ public void setWrapper(TcpConnectionSupport wrapper) {
433435
this.wrapper = wrapper;
434436
}
435437

438+
/**
439+
* Get the {@link TcpConnectionSupport} that wrapping at last.
440+
* <p>
441+
* This method add for fixing the <a href="https://github.com/spring-projects/spring-integration/issues/8609">gh-8609</a>.
442+
* </p>
443+
*
444+
* @return the {@link TcpConnectionSupport} that wrapping at last
445+
*/
446+
TcpConnectionSupport getLastWrapper() {
447+
return Optional.ofNullable(this.wrapper).map(TcpConnectionSupport::getLastWrapper).orElse(this);
448+
}
449+
436450
public String getConnectionFactoryName() {
437451
return this.connectionFactoryName;
438452
}

spring-integration-ip/src/test/java/org/springframework/integration/ip/tcp/connection/TcpSenderTests.java

+14-2
Original file line numberDiff line numberDiff line change
@@ -1,5 +1,5 @@
11
/*
2-
* Copyright 2022 the original author or authors.
2+
* Copyright 2022-2023 the original author or authors.
33
*
44
* Licensed under the Apache License, Version 2.0 (the "License");
55
* you may not use this file except in compliance with the License.
@@ -18,7 +18,9 @@
1818

1919
import java.util.ArrayList;
2020
import java.util.Collections;
21+
import java.util.HashMap;
2122
import java.util.List;
23+
import java.util.Map;
2224
import java.util.concurrent.CountDownLatch;
2325
import java.util.concurrent.TimeUnit;
2426
import java.util.concurrent.atomic.AtomicInteger;
@@ -30,6 +32,7 @@
3032

3133
/**
3234
* @author Gary Russell
35+
* @author Kazuki Shimizu
3336
* @since 5.3.10
3437
*
3538
*/
@@ -81,11 +84,14 @@ private void senderCalledForDeadConnectionClient(AbstractClientConnectionFactory
8184
List<Integer> addOrder = Collections.synchronizedList(new ArrayList<>());
8285
List<Integer> remOrder = Collections.synchronizedList(new ArrayList<>());
8386
AtomicReference<Thread> thread = new AtomicReference<>();
87+
Map<Integer, TcpConnection> interceptorsPerInstance = new HashMap<>();
88+
List<TcpConnection> passedConnectionsToSenderViaAddNewConnection = new ArrayList<>();
8489
class InterceptorFactory extends HelloWorldInterceptorFactory {
8590

8691
@Override
8792
public TcpConnectionInterceptorSupport getInterceptor() {
88-
return new TcpConnectionInterceptorSupport() {
93+
94+
TcpConnectionInterceptorSupport interceptor = new TcpConnectionInterceptorSupport() {
8995

9096
private final int instance = instances.incrementAndGet();
9197

@@ -107,6 +113,8 @@ public synchronized void removeDeadConnection(TcpConnection connection) {
107113
}
108114

109115
};
116+
interceptorsPerInstance.put(instances.get(), interceptor);
117+
return interceptor;
110118
}
111119

112120
}
@@ -118,6 +126,7 @@ public synchronized void removeDeadConnection(TcpConnection connection) {
118126
@Override
119127
public void addNewConnection(TcpConnection connection) {
120128
addOrder.add(99);
129+
passedConnectionsToSenderViaAddNewConnection.add(connection);
121130
adds.countDown();
122131
}
123132

@@ -146,6 +155,9 @@ public synchronized void removeDeadConnection(TcpConnection connection) {
146155
assertThat(remOrder).containsExactly(1, 2, 99, 3, 4, 5, 99, 6);
147156
assertThat(interceptorAddCalled.await(10, TimeUnit.SECONDS)).isTrue();
148157
assertThat(interceptorRemCalled.await(10, TimeUnit.SECONDS)).isTrue();
158+
// should be passed the last interceptor to the real sender via addNewConnection method
159+
assertThat(passedConnectionsToSenderViaAddNewConnection.get(0)).isSameAs(interceptorsPerInstance.get(3));
160+
assertThat(passedConnectionsToSenderViaAddNewConnection.get(1)).isSameAs(interceptorsPerInstance.get(6));
149161
}
150162

151163
}

0 commit comments

Comments
 (0)