-
Notifications
You must be signed in to change notification settings - Fork 1.6k
ReplyingKT - Add Simpler Text-Based Correlation Mechanism #2399
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
Comments
Don't use issues to ask questions; use the discussions tab or ask on Stack Overflow. You can add a https://kafka.apache.org/documentation/#producerconfigs_interceptor.classes |
@garyrussell thanks for the reply. One part of my question was to put this is as a requirement to log the CorrelationId header id (UUID) value through ReplyingKafkaTemplate. Currently it only logs the BigInteger value. I tried plugging in one Producer Interceptor and printed the Kafka CorrelationId like below
It is printing the same CorrelationId generated through the DefaultCorrelationStrategy. But as i mentioned above if i use this CorrelationId and publish the message manually through an UI, then this fails in isPending check in ReplyingKafkaTemplate returns false. |
The correlationId is currently a binary representation of a UUID, rather than a simple string representation of the UUID, so it can't be "entered on a UI" (unless you control the UI). We could add a simpler correlation option, but here is a work around: public class Interceptor<K, V, R> implements ProducerInterceptor<K, V>, ConsumerInterceptor<K, R> {
private static final ConcurrentHashMap<String, byte[]> map = new ConcurrentHashMap<>();
@Override
public void configure(Map<String, ?> configs) {
}
@Override
public ProducerRecord<K, V> onSend(ProducerRecord<K, V> record) {
if (!record.topic().endsWith("replies")) {
Header header = record.headers().lastHeader(KafkaHeaders.CORRELATION_ID);
ByteBuffer bb = ByteBuffer.wrap(header.value());
long firstLong = bb.getLong();
long secondLong = bb.getLong();
UUID uuid = new UUID(firstLong, secondLong);
System.out.println(" header key " + header.key() + "header value " + uuid);
map.put(uuid.toString(), header.value());
record.headers().remove(KafkaHeaders.CORRELATION_ID);
record.headers().add(KafkaHeaders.CORRELATION_ID, uuid.toString().getBytes(StandardCharsets.UTF_8));
}
return record;
}
@Override
public void onAcknowledgement(RecordMetadata metadata, Exception exception) {
}
@Override
public void close() {
}
@Override
public ConsumerRecords<K, R> onConsume(ConsumerRecords<K, R> records) {
records.forEach(rec -> {
if (rec.topic().endsWith("replies")) {
Header header = rec.headers().lastHeader(KafkaHeaders.CORRELATION_ID);
rec.headers().remove(KafkaHeaders.CORRELATION_ID);
byte[] origValue = map.remove(new String(header.value(), StandardCharsets.UTF_8));
rec.headers().add(KafkaHeaders.CORRELATION_ID, origValue);
}
});
return records;
}
@Override
public void onCommit(Map<TopicPartition, OffsetAndMetadata> offsets) {
}
} It would need some error protection for production use. I needed the @SpringBootApplication
public class Kgh2399Application {
public static void main(String[] args) {
SpringApplication.run(Kgh2399Application.class, args);
}
@Bean
ReplyingKafkaTemplate<String, String, String> rkt(ProducerFactory<String, String> pf,
ConcurrentKafkaListenerContainerFactory<String, String> factory,
KafkaTemplate<String, String> template) {
factory.getContainerProperties().setDeliveryAttemptHeader(true);
factory.setReplyTemplate(template);
ConcurrentMessageListenerContainer<String, String> container = factory.createContainer("kgh2399-replies");
container.getContainerProperties().setGroupId("kgh2399-replies");
ReplyingKafkaTemplate<String, String, String> rTemplate = new ReplyingKafkaTemplate<>(pf, container);
rTemplate.setDefaultReplyTimeout(Duration.ofMinutes(2));
return rTemplate;
}
@Bean
KafkaTemplate<String, String> template(ProducerFactory<String, String> pf) {
return new KafkaTemplate<>(pf);
}
@Bean
NewTopic topic1() {
return TopicBuilder.name("kgh2399").partitions(1).replicas(1).build();
}
@Bean
NewTopic topic2() {
return TopicBuilder.name("kgh2399-replies").partitions(1).replicas(1).build();
}
@Bean
ApplicationRunner runner(ReplyingKafkaTemplate<String, String, String> rkt) {
return args -> {
RequestReplyMessageFuture<String, String> future = rkt
.sendAndReceive(MessageBuilder.withPayload("foo").setHeader(KafkaHeaders.TOPIC, "kgh2399").build());
System.out.println(future.getSendFuture().get(10, TimeUnit.SECONDS).getRecordMetadata());
Message<?> record = future.get(3, TimeUnit.MINUTES);
System.out.println(record.getPayload());
};
}
@KafkaListener(id = "kgh2399", topics = "kgh2399")
@SendTo
String listen(String in, @Header(KafkaHeaders.CORRELATION_ID) byte[] corr) {
System.out.println(in + " corr:" + new String(corr, StandardCharsets.UTF_8));
return in.toUpperCase();
}
} spring.kafka.producer.properties.interceptor.classes=com.example.demo.Interceptor
spring.kafka.consumer.properties.interceptor.classes=com.example.demo.Interceptor
spring.kafka.consumer.auto-offset-reset=earliest
|
Actually, you don't need the map... public class Interceptor<K, V, R> implements ProducerInterceptor<K, V>, ConsumerInterceptor<K, R> {
@Override
public void configure(Map<String, ?> configs) {
}
@Override
public ProducerRecord<K, V> onSend(ProducerRecord<K, V> record) {
if (!record.topic().endsWith("replies")) {
Header header = record.headers().lastHeader(KafkaHeaders.CORRELATION_ID);
ByteBuffer bb = ByteBuffer.wrap(header.value());
long firstLong = bb.getLong();
long secondLong = bb.getLong();
UUID uuid = new UUID(firstLong, secondLong);
System.out.println(" header key " + header.key() + "header value " + uuid);
record.headers().remove(KafkaHeaders.CORRELATION_ID);
record.headers().add(KafkaHeaders.CORRELATION_ID, uuid.toString().getBytes(StandardCharsets.UTF_8));
}
return record;
}
@Override
public void onAcknowledgement(RecordMetadata metadata, Exception exception) {
}
@Override
public void close() {
}
@Override
public ConsumerRecords<K, R> onConsume(ConsumerRecords<K, R> records) {
records.forEach(rec -> {
if (rec.topic().endsWith("replies")) {
Header header = rec.headers().lastHeader(KafkaHeaders.CORRELATION_ID);
rec.headers().remove(KafkaHeaders.CORRELATION_ID);
UUID uuid = UUID.fromString(new String(header.value(), StandardCharsets.UTF_8));
byte[] bytes = new byte[16];
ByteBuffer bb = ByteBuffer.wrap(bytes);
bb.putLong(uuid.getMostSignificantBits());
bb.putLong(uuid.getLeastSignificantBits());
rec.headers().add(KafkaHeaders.CORRELATION_ID, bytes);
}
});
return records;
}
@Override
public void onCommit(Map<TopicPartition, OffsetAndMetadata> offsets) {
}
} |
Resolves spring-projects#2399 By default, the correlationId header contains a binary representation of a UUID; add an option to use a String representation instead.
Resolves #2399 By default, the correlationId header contains a binary representation of a UUID; add an option to use a String representation instead. * Fix test name.
Thanks Gary for accepting the minor request. Also Interceptors are working fine as per your code. |
We are using AggregatingKafkaReplyingTemplate to achieve Request Reply pattern with the interacting microservices
For the testing purpose i have a spring boot producer application which produces the message on the request topic and expects a message back from the consumer on the response topic.
I am using a Local UI as a consumer which will publish the response message manually as soon as the request message is sent.
For this i would need to send the Kafka internal CorrelationId back to the broker.
Is there any way to log/know the generated string value (not the bytes) CorrelationId by Kafka template in the Spring boot app itself (in logs maybe ) ? Can i take that value and put it in the local Kafka UI to send a message back in the receiving topic ?
i tried getting the UUID generated from Kafka template through debugging in the template class and sending it through my local Kafka UI. But then the correlation validation fails and i get this message.
No pending reply: <CorrelationId> perhaps timed out, or using a shared reply topic
I checked the actual UUID is not logged but the hash key of the byte value and bytes itself of the UUID is logged.
The text was updated successfully, but these errors were encountered: