Skip to content

Commit 3b6c8b9

Browse files
garyrussellartembilan
authored andcommitted
GH-1961: Polishing
- rename field to something more meaningful - just compare the contexts in the event listener **cherry-pick to 2.7.x**
1 parent f4a0b4b commit 3b6c8b9

File tree

2 files changed

+17
-17
lines changed

2 files changed

+17
-17
lines changed

spring-kafka/src/main/java/org/springframework/kafka/retrytopic/DefaultDestinationTopicResolver.java

+14-9
Original file line numberDiff line numberDiff line change
@@ -60,14 +60,14 @@ public class DefaultDestinationTopicResolver implements DestinationTopicResolver
6060

6161
private final ApplicationContext applicationContext;
6262

63-
private boolean containerClosed;
63+
private boolean contextRefreshed;
6464

6565
public DefaultDestinationTopicResolver(Clock clock, ApplicationContext applicationContext) {
6666
this.applicationContext = applicationContext;
6767
this.clock = clock;
6868
this.sourceDestinationsHolderMap = new HashMap<>();
6969
this.destinationsTopicMap = new HashMap<>();
70-
this.containerClosed = false;
70+
this.contextRefreshed = false;
7171
}
7272

7373
@Override
@@ -127,7 +127,7 @@ private DestinationTopic getDestinationFor(String topic) {
127127
}
128128

129129
private DestinationTopicHolder getDestinationHolderFor(String topic) {
130-
return this.containerClosed
130+
return this.contextRefreshed
131131
? doGetDestinationFor(topic)
132132
: getDestinationTopicSynchronized(topic);
133133
}
@@ -145,9 +145,9 @@ private DestinationTopicHolder doGetDestinationFor(String topic) {
145145

146146
@Override
147147
public void addDestinationTopics(List<DestinationTopic> destinationsToAdd) {
148-
if (this.containerClosed) {
148+
if (this.contextRefreshed) {
149149
throw new IllegalStateException("Cannot add new destinations, "
150-
+ DefaultDestinationTopicResolver.class.getSimpleName() + " is already closed.");
150+
+ DefaultDestinationTopicResolver.class.getSimpleName() + " is already refreshed.");
151151
}
152152
synchronized (this.sourceDestinationsHolderMap) {
153153
this.destinationsTopicMap.putAll(destinationsToAdd
@@ -175,13 +175,18 @@ private DestinationTopic getNextDestinationTopic(List<DestinationTopic> destinat
175175

176176
@Override
177177
public void onApplicationEvent(ContextRefreshedEvent event) {
178-
if (Objects.equals(event.getApplicationContext().getId(), this.applicationContext.getId())) {
179-
this.containerClosed = true;
178+
if (Objects.equals(event.getApplicationContext(), this.applicationContext)) {
179+
this.contextRefreshed = true;
180180
}
181181
}
182182

183-
public boolean isContainerClosed() {
184-
return this.containerClosed;
183+
/**
184+
* Return true if the application context is refreshed.
185+
* @return true if refreshed.
186+
* @since 2.7.8
187+
*/
188+
public boolean isContextRefreshed() {
189+
return this.contextRefreshed;
185190
}
186191

187192
public static class DestinationTopicHolder {

spring-kafka/src/test/java/org/springframework/kafka/retrytopic/DefaultDestinationTopicResolverTests.java

+3-8
Original file line numberDiff line numberDiff line change
@@ -19,7 +19,6 @@
1919
import static org.assertj.core.api.Assertions.assertThat;
2020
import static org.assertj.core.api.Assertions.assertThatIllegalStateException;
2121
import static org.assertj.core.api.Assertions.assertThatNullPointerException;
22-
import static org.mockito.BDDMockito.given;
2322

2423
import java.math.BigInteger;
2524
import java.time.Clock;
@@ -167,7 +166,6 @@ private long getExpectedNextExecutionTime(DestinationTopic destinationTopic) {
167166

168167
@Test
169168
void shouldThrowIfAddsDestinationsAfterClosed() {
170-
given(applicationContext.getId()).willReturn("the-context_id");
171169
((DefaultDestinationTopicResolver) defaultDestinationTopicContainer)
172170
.onApplicationEvent(new ContextRefreshedEvent(applicationContext));
173171
assertThatIllegalStateException().isThrownBy(() ->
@@ -176,19 +174,16 @@ void shouldThrowIfAddsDestinationsAfterClosed() {
176174

177175
@Test
178176
void shouldCloseContainerOnContextRefresh() {
179-
given(applicationContext.getId()).willReturn("the-context_id");
180177
((DefaultDestinationTopicResolver) defaultDestinationTopicContainer)
181178
.onApplicationEvent(new ContextRefreshedEvent(applicationContext));
182-
assertThat(((DefaultDestinationTopicResolver) defaultDestinationTopicContainer).isContainerClosed()).isTrue();
179+
assertThat(((DefaultDestinationTopicResolver) defaultDestinationTopicContainer).isContextRefreshed()).isTrue();
183180
}
184181

185182
@Test
186-
void shouldNotCloseContainerOnOtherContextRefresh() {
187-
given(applicationContext.getId()).willReturn("the-context_id");
188-
given(otherApplicationContext.getId()).willReturn("other-context_id");
183+
void shouldNotMarkContainerRefeshedOnOtherContextRefresh() {
189184
((DefaultDestinationTopicResolver) defaultDestinationTopicContainer)
190185
.onApplicationEvent(new ContextRefreshedEvent(otherApplicationContext));
191-
assertThat(((DefaultDestinationTopicResolver) defaultDestinationTopicContainer).isContainerClosed()).isFalse();
186+
assertThat(((DefaultDestinationTopicResolver) defaultDestinationTopicContainer).isContextRefreshed()).isFalse();
192187
}
193188

194189
}

0 commit comments

Comments
 (0)