Skip to content

Commit 7cc4524

Browse files
garyrussellartembilan
authored andcommitted
GH-2419: DLPR: Protect Against Non-Compliant PF
Resolves #2419 We try to obtain the configured request timeout using the producer factory's configuration properties. Some wrappers (e.g. open tracing) do not implement this method, so fallback to the default instead of throwing an USOE. **cherry-pick to 2.9.x, 2.8.x**
1 parent b0c1c98 commit 7cc4524

File tree

2 files changed

+29
-1
lines changed

2 files changed

+29
-1
lines changed

spring-kafka/src/main/java/org/springframework/kafka/listener/DeadLetterPublishingRecoverer.java

+7-1
Original file line numberDiff line numberDiff line change
@@ -688,7 +688,13 @@ private String pubFailMessage(ProducerRecord<Object, Object> outRecord, Consumer
688688
protected Duration determineSendTimeout(KafkaOperations<?, ?> template) {
689689
ProducerFactory<? extends Object, ? extends Object> producerFactory = template.getProducerFactory();
690690
if (producerFactory != null) { // NOSONAR - will only occur in mock tests
691-
Map<String, Object> props = producerFactory.getConfigurationProperties();
691+
Map<String, Object> props;
692+
try {
693+
props = producerFactory.getConfigurationProperties();
694+
}
695+
catch (UnsupportedOperationException ex) {
696+
props = Collections.emptyMap();
697+
}
692698
if (props != null) { // NOSONAR - will only occur in mock tests
693699
return KafkaUtils.determineSendTimeout(props, this.timeoutBuffer,
694700
this.waitForSendResultTimeout.toMillis());

spring-kafka/src/test/java/org/springframework/kafka/listener/DeadLetterPublishingRecovererTests.java

+22
Original file line numberDiff line numberDiff line change
@@ -24,6 +24,7 @@
2424
import static org.mockito.BDDMockito.given;
2525
import static org.mockito.BDDMockito.then;
2626
import static org.mockito.BDDMockito.willAnswer;
27+
import static org.mockito.BDDMockito.willCallRealMethod;
2728
import static org.mockito.BDDMockito.willReturn;
2829
import static org.mockito.Mockito.atLeastOnce;
2930
import static org.mockito.Mockito.mock;
@@ -876,4 +877,25 @@ void immutableHeaders() {
876877
assertThat(KafkaTestUtils.getPropertyValue(headers, "headers", List.class)).hasSize(12);
877878
}
878879

880+
@SuppressWarnings("unchecked")
881+
@Test
882+
void nonCompliantProducerFactory() throws Exception {
883+
KafkaOperations<?, ?> template = mock(KafkaOperations.class);
884+
ProducerFactory pf = mock(ProducerFactory.class);
885+
886+
willCallRealMethod().given(pf).getConfigurationProperties();
887+
888+
given(template.getProducerFactory()).willReturn(pf);
889+
CompletableFuture<?> future = mock(CompletableFuture.class);
890+
ArgumentCaptor<Long> timeoutCaptor = ArgumentCaptor.forClass(Long.class);
891+
given(template.send(any(ProducerRecord.class))).willReturn(future);
892+
given(future.get(timeoutCaptor.capture(), eq(TimeUnit.MILLISECONDS))).willThrow(new TimeoutException());
893+
ConsumerRecord<String, String> record = new ConsumerRecord<>("foo", 0, 0L, "bar", null);
894+
DeadLetterPublishingRecoverer recoverer = new DeadLetterPublishingRecoverer(template);
895+
recoverer.setFailIfSendResultIsError(true);
896+
assertThatThrownBy(() -> recoverer.accept(record, new RuntimeException()))
897+
.isExactlyInstanceOf(KafkaException.class);
898+
assertThat(timeoutCaptor.getValue()).isEqualTo(Duration.ofSeconds(125).toMillis());
899+
}
900+
879901
}

0 commit comments

Comments
 (0)