Skip to content

Commit f4a0b4b

Browse files
authored
GH-1961: DefaultDestinationTopicResolver: Inconsistencies In Spring Cloud Environment (#1962)
* GH-1961: Close container on ContextRefreshEvent only if event source is the same application context. * GH-1961: Updated Javadoc
1 parent afd52e0 commit f4a0b4b

File tree

2 files changed

+50
-4
lines changed

2 files changed

+50
-4
lines changed

spring-kafka/src/main/java/org/springframework/kafka/retrytopic/DefaultDestinationTopicResolver.java

+13-2
Original file line numberDiff line numberDiff line change
@@ -26,6 +26,7 @@
2626
import java.util.stream.Collectors;
2727
import java.util.stream.IntStream;
2828

29+
import org.springframework.context.ApplicationContext;
2930
import org.springframework.context.ApplicationListener;
3031
import org.springframework.context.event.ContextRefreshedEvent;
3132
import org.springframework.kafka.listener.ListenerExecutionFailedException;
@@ -40,6 +41,7 @@
4041
*
4142
* @author Tomaz Fernandes
4243
* @author Gary Russell
44+
* @author Yvette Quinby
4345
* @since 2.7
4446
*
4547
*/
@@ -56,9 +58,12 @@ public class DefaultDestinationTopicResolver implements DestinationTopicResolver
5658

5759
private final Clock clock;
5860

61+
private final ApplicationContext applicationContext;
62+
5963
private boolean containerClosed;
6064

61-
public DefaultDestinationTopicResolver(Clock clock) {
65+
public DefaultDestinationTopicResolver(Clock clock, ApplicationContext applicationContext) {
66+
this.applicationContext = applicationContext;
6267
this.clock = clock;
6368
this.sourceDestinationsHolderMap = new HashMap<>();
6469
this.destinationsTopicMap = new HashMap<>();
@@ -170,7 +175,13 @@ private DestinationTopic getNextDestinationTopic(List<DestinationTopic> destinat
170175

171176
@Override
172177
public void onApplicationEvent(ContextRefreshedEvent event) {
173-
this.containerClosed = true;
178+
if (Objects.equals(event.getApplicationContext().getId(), this.applicationContext.getId())) {
179+
this.containerClosed = true;
180+
}
181+
}
182+
183+
public boolean isContainerClosed() {
184+
return this.containerClosed;
174185
}
175186

176187
public static class DestinationTopicHolder {

spring-kafka/src/test/java/org/springframework/kafka/retrytopic/DefaultDestinationTopicResolverTests.java

+37-2
Original file line numberDiff line numberDiff line change
@@ -19,6 +19,7 @@
1919
import static org.assertj.core.api.Assertions.assertThat;
2020
import static org.assertj.core.api.Assertions.assertThatIllegalStateException;
2121
import static org.assertj.core.api.Assertions.assertThatNullPointerException;
22+
import static org.mockito.BDDMockito.given;
2223

2324
import java.math.BigInteger;
2425
import java.time.Clock;
@@ -28,21 +29,34 @@
2829

2930
import org.junit.jupiter.api.BeforeEach;
3031
import org.junit.jupiter.api.Test;
32+
import org.junit.jupiter.api.extension.ExtendWith;
33+
import org.mockito.Mock;
34+
import org.mockito.junit.jupiter.MockitoExtension;
3135

36+
import org.springframework.context.ApplicationContext;
37+
import org.springframework.context.event.ContextRefreshedEvent;
3238
import org.springframework.kafka.listener.ListenerExecutionFailedException;
3339
import org.springframework.kafka.listener.TimestampedException;
3440

3541
/**
3642
* @author Tomaz Fernandes
43+
* @author Yvette Quinby
3744
* @since 2.7
3845
*/
46+
@ExtendWith(MockitoExtension.class)
3947
class DefaultDestinationTopicResolverTests extends DestinationTopicTests {
4048

4149
private Map<String, DefaultDestinationTopicResolver.DestinationTopicHolder> destinationTopicMap;
4250

51+
@Mock
52+
private ApplicationContext applicationContext;
53+
54+
@Mock
55+
private ApplicationContext otherApplicationContext;
56+
4357
private final Clock clock = TestClockUtils.CLOCK;
4458

45-
private final DestinationTopicResolver defaultDestinationTopicContainer = new DefaultDestinationTopicResolver(clock);
59+
private DestinationTopicResolver defaultDestinationTopicContainer;
4660

4761
private final long originalTimestamp = Instant.now(this.clock).toEpochMilli();
4862

@@ -53,6 +67,7 @@ class DefaultDestinationTopicResolverTests extends DestinationTopicTests {
5367
@BeforeEach
5468
public void setup() {
5569

70+
defaultDestinationTopicContainer = new DefaultDestinationTopicResolver(clock, applicationContext);
5671
defaultDestinationTopicContainer.addDestinationTopics(allFirstDestinationsTopics);
5772
defaultDestinationTopicContainer.addDestinationTopics(allSecondDestinationTopics);
5873
defaultDestinationTopicContainer.addDestinationTopics(allThirdDestinationTopics);
@@ -152,8 +167,28 @@ private long getExpectedNextExecutionTime(DestinationTopic destinationTopic) {
152167

153168
@Test
154169
void shouldThrowIfAddsDestinationsAfterClosed() {
155-
((DefaultDestinationTopicResolver) defaultDestinationTopicContainer).onApplicationEvent(null);
170+
given(applicationContext.getId()).willReturn("the-context_id");
171+
((DefaultDestinationTopicResolver) defaultDestinationTopicContainer)
172+
.onApplicationEvent(new ContextRefreshedEvent(applicationContext));
156173
assertThatIllegalStateException().isThrownBy(() ->
157174
defaultDestinationTopicContainer.addDestinationTopics(Collections.emptyList()));
158175
}
176+
177+
@Test
178+
void shouldCloseContainerOnContextRefresh() {
179+
given(applicationContext.getId()).willReturn("the-context_id");
180+
((DefaultDestinationTopicResolver) defaultDestinationTopicContainer)
181+
.onApplicationEvent(new ContextRefreshedEvent(applicationContext));
182+
assertThat(((DefaultDestinationTopicResolver) defaultDestinationTopicContainer).isContainerClosed()).isTrue();
183+
}
184+
185+
@Test
186+
void shouldNotCloseContainerOnOtherContextRefresh() {
187+
given(applicationContext.getId()).willReturn("the-context_id");
188+
given(otherApplicationContext.getId()).willReturn("other-context_id");
189+
((DefaultDestinationTopicResolver) defaultDestinationTopicContainer)
190+
.onApplicationEvent(new ContextRefreshedEvent(otherApplicationContext));
191+
assertThat(((DefaultDestinationTopicResolver) defaultDestinationTopicContainer).isContainerClosed()).isFalse();
192+
}
193+
159194
}

0 commit comments

Comments
 (0)