Skip to content

ReplyingKT - Add Simpler Text-Based Correlation Mechanism #2399

New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Closed
saurav28 opened this issue Sep 19, 2022 · 5 comments · Fixed by #2402
Closed

ReplyingKT - Add Simpler Text-Based Correlation Mechanism #2399

saurav28 opened this issue Sep 19, 2022 · 5 comments · Fixed by #2402

Comments

@saurav28
Copy link

We are using AggregatingKafkaReplyingTemplate to achieve Request Reply pattern with the interacting microservices

For the testing purpose i have a spring boot producer application which produces the message on the request topic and expects a message back from the consumer on the response topic.

I am using a Local UI as a consumer which will publish the response message manually as soon as the request message is sent.

For this i would need to send the Kafka internal CorrelationId back to the broker.

Is there any way to log/know the generated string value (not the bytes) CorrelationId by Kafka template in the Spring boot app itself (in logs maybe ) ? Can i take that value and put it in the local Kafka UI to send a message back in the receiving topic ?

i tried getting the UUID generated from Kafka template through debugging in the template class and sending it through my local Kafka UI. But then the correlation validation fails and i get this message.

No pending reply: <CorrelationId> perhaps timed out, or using a shared reply topic

I checked the actual UUID is not logged but the hash key of the byte value and bytes itself of the UUID is logged.

@garyrussell
Copy link
Contributor

Don't use issues to ask questions; use the discussions tab or ask on Stack Overflow.

You can add a ProducerInterceptor to log the correlation id header. Add it to the producer factory configs.

https://kafka.apache.org/documentation/#producerconfigs_interceptor.classes

@saurav28
Copy link
Author

saurav28 commented Sep 20, 2022

@garyrussell thanks for the reply.

One part of my question was to put this is as a requirement to log the CorrelationId header id (UUID) value through ReplyingKafkaTemplate. Currently it only logs the BigInteger value.

I tried plugging in one Producer Interceptor and printed the Kafka CorrelationId like below

@Override
    public ProducerRecord onSend(ProducerRecord record) {
        Headers headers = record.headers();

        headers.forEach(header -> {
            if ("CorrelationId".equals(header.key())) {

                ByteBuffer bb = ByteBuffer.wrap(header.value());
                long firstLong = bb.getLong();
                long secondLong = bb.getLong();
                UUID uuid = new UUID(firstLong, secondLong);
                System.out.println(" header key " + header.key() + "header value " + uuid);
            }
        });


        return record;
    }

It is printing the same CorrelationId generated through the DefaultCorrelationStrategy.

But as i mentioned above if i use this CorrelationId and publish the message manually through an UI, then this fails in isPending check in ReplyingKafkaTemplate returns false.

@garyrussell
Copy link
Contributor

garyrussell commented Sep 20, 2022

The correlationId is currently a binary representation of a UUID, rather than a simple string representation of the UUID, so it can't be "entered on a UI" (unless you control the UI).

We could add a simpler correlation option, but here is a work around:

public class Interceptor<K, V, R> implements ProducerInterceptor<K, V>, ConsumerInterceptor<K, R> {

	private static final ConcurrentHashMap<String, byte[]> map = new ConcurrentHashMap<>();

	@Override
	public void configure(Map<String, ?> configs) {
	}

	@Override
	public ProducerRecord<K, V> onSend(ProducerRecord<K, V> record) {
		if (!record.topic().endsWith("replies")) {
			Header header = record.headers().lastHeader(KafkaHeaders.CORRELATION_ID);
			ByteBuffer bb = ByteBuffer.wrap(header.value());
			long firstLong = bb.getLong();
			long secondLong = bb.getLong();
			UUID uuid = new UUID(firstLong, secondLong);
			System.out.println(" header key " + header.key() + "header value " + uuid);
			map.put(uuid.toString(), header.value());
			record.headers().remove(KafkaHeaders.CORRELATION_ID);
			record.headers().add(KafkaHeaders.CORRELATION_ID, uuid.toString().getBytes(StandardCharsets.UTF_8));
		}
		return record;
	}

	@Override
	public void onAcknowledgement(RecordMetadata metadata, Exception exception) {
	}

	@Override
	public void close() {
	}

	@Override
	public ConsumerRecords<K, R> onConsume(ConsumerRecords<K, R> records) {
		records.forEach(rec -> {
			if (rec.topic().endsWith("replies")) {
				Header header = rec.headers().lastHeader(KafkaHeaders.CORRELATION_ID);
				rec.headers().remove(KafkaHeaders.CORRELATION_ID);
				byte[] origValue = map.remove(new String(header.value(), StandardCharsets.UTF_8));
				rec.headers().add(KafkaHeaders.CORRELATION_ID, origValue);
			}
		});
		return records;
	}

	@Override
	public void onCommit(Map<TopicPartition, OffsetAndMetadata> offsets) {
	}

}

It would need some error protection for production use.

I needed the if (record.topic()... tests because my consumer is in the same app...

@SpringBootApplication
public class Kgh2399Application {

	public static void main(String[] args) {
		SpringApplication.run(Kgh2399Application.class, args);
	}

	@Bean
	ReplyingKafkaTemplate<String, String, String> rkt(ProducerFactory<String, String> pf,
			ConcurrentKafkaListenerContainerFactory<String, String> factory,
			KafkaTemplate<String, String> template) {

		factory.getContainerProperties().setDeliveryAttemptHeader(true);
		factory.setReplyTemplate(template);
		ConcurrentMessageListenerContainer<String, String> container = factory.createContainer("kgh2399-replies");
		container.getContainerProperties().setGroupId("kgh2399-replies");
		ReplyingKafkaTemplate<String, String, String> rTemplate = new ReplyingKafkaTemplate<>(pf, container);
		rTemplate.setDefaultReplyTimeout(Duration.ofMinutes(2));
		return rTemplate;
	}

	@Bean
	KafkaTemplate<String, String> template(ProducerFactory<String, String> pf) {
		return new KafkaTemplate<>(pf);
	}

	@Bean
	NewTopic topic1() {
		return TopicBuilder.name("kgh2399").partitions(1).replicas(1).build();
	}

	@Bean
	NewTopic topic2() {
		return TopicBuilder.name("kgh2399-replies").partitions(1).replicas(1).build();
	}


	@Bean
	ApplicationRunner runner(ReplyingKafkaTemplate<String, String, String> rkt) {
		return args -> {
			RequestReplyMessageFuture<String, String> future = rkt
					.sendAndReceive(MessageBuilder.withPayload("foo").setHeader(KafkaHeaders.TOPIC, "kgh2399").build());
			System.out.println(future.getSendFuture().get(10, TimeUnit.SECONDS).getRecordMetadata());
			Message<?> record = future.get(3, TimeUnit.MINUTES);
			System.out.println(record.getPayload());
		};
	}

	@KafkaListener(id = "kgh2399", topics = "kgh2399")
	@SendTo
	String listen(String in, @Header(KafkaHeaders.CORRELATION_ID) byte[] corr) {
		System.out.println(in + " corr:" + new String(corr, StandardCharsets.UTF_8));
		return in.toUpperCase();
	}

}
spring.kafka.producer.properties.interceptor.classes=com.example.demo.Interceptor
spring.kafka.consumer.properties.interceptor.classes=com.example.demo.Interceptor
spring.kafka.consumer.auto-offset-reset=earliest
 header key kafka_correlationIdheader value 313b815d-dcf2-406f-b850-28c3d6252748
kgh2399-0@11
foo corr:313b815d-dcf2-406f-b850-28c3d6252748
FOO

@garyrussell garyrussell changed the title Fetching the Kafka CorrelationId for testing purpose ReplyingKT - Add Simpler Text-Based Correlation Mechanism Sep 20, 2022
@garyrussell garyrussell added this to the 3.0.0-RC1 milestone Sep 20, 2022
@garyrussell
Copy link
Contributor

garyrussell commented Sep 20, 2022

Actually, you don't need the map...

public class Interceptor<K, V, R> implements ProducerInterceptor<K, V>, ConsumerInterceptor<K, R> {

	@Override
	public void configure(Map<String, ?> configs) {
	}

	@Override
	public ProducerRecord<K, V> onSend(ProducerRecord<K, V> record) {
		if (!record.topic().endsWith("replies")) {
			Header header = record.headers().lastHeader(KafkaHeaders.CORRELATION_ID);
			ByteBuffer bb = ByteBuffer.wrap(header.value());
			long firstLong = bb.getLong();
			long secondLong = bb.getLong();
			UUID uuid = new UUID(firstLong, secondLong);
			System.out.println(" header key " + header.key() + "header value " + uuid);
			record.headers().remove(KafkaHeaders.CORRELATION_ID);
			record.headers().add(KafkaHeaders.CORRELATION_ID, uuid.toString().getBytes(StandardCharsets.UTF_8));
		}
		return record;
	}

	@Override
	public void onAcknowledgement(RecordMetadata metadata, Exception exception) {
	}

	@Override
	public void close() {
	}

	@Override
	public ConsumerRecords<K, R> onConsume(ConsumerRecords<K, R> records) {
		records.forEach(rec -> {
			if (rec.topic().endsWith("replies")) {
				Header header = rec.headers().lastHeader(KafkaHeaders.CORRELATION_ID);
				rec.headers().remove(KafkaHeaders.CORRELATION_ID);
				UUID uuid = UUID.fromString(new String(header.value(), StandardCharsets.UTF_8));
				byte[] bytes = new byte[16];
				ByteBuffer bb = ByteBuffer.wrap(bytes);
				bb.putLong(uuid.getMostSignificantBits());
				bb.putLong(uuid.getLeastSignificantBits());
				rec.headers().add(KafkaHeaders.CORRELATION_ID, bytes);
			}
		});
		return records;
	}

	@Override
	public void onCommit(Map<TopicPartition, OffsetAndMetadata> offsets) {
	}

}

garyrussell added a commit to garyrussell/spring-kafka that referenced this issue Sep 20, 2022
Resolves spring-projects#2399

By default, the correlationId header contains a binary representation of
a UUID; add an option to use a String representation instead.
artembilan pushed a commit that referenced this issue Sep 20, 2022
Resolves #2399

By default, the correlationId header contains a binary representation of
a UUID; add an option to use a String representation instead.

* Fix test name.
@saurav28
Copy link
Author

Thanks Gary for accepting the minor request.

Also Interceptors are working fine as per your code.

Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Projects
None yet
Development

Successfully merging a pull request may close this issue.

2 participants