Skip to content

[improve][io][kca] support fully-qualified topic names in source records #24248

New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Merged
merged 2 commits into from
May 7, 2025

Conversation

efcasado
Copy link
Contributor

@efcasado efcasado commented May 5, 2025

Motivation

This change builds on the work previously done in #24221.

The current implementation of the Kafka Connect adaptor in Pulsar IO does not support fully-qualified Pulsar topic names in source records. Instead, it forcefully prepends the default persistent://<tenant>/<namespace>/ prefix to all destination topics. This behavior can be problematic in multi-tenant environments where dynamic topic routing is required.

For example, consider a setup with:

  • A shared PostgreSQL instance where tenants are isolated in separate schemas (e.g. tenant1, tenant2)
  • A shared Pulsar cluster where each tenant has its own Pulsar tenant (e.g. tenant1, tenant2)
  • A single Debezium PostgreSQL source connector deployed in the global (shared) tenant

Using Kafka Connect transformations, users may want to route records to tenant-specific topics based on the PostgreSQL schema:

transforms.reroute.topic.regex: "mydatabaseserver.(.*).orders"
transforms.reroute.topic.replacement: "persistent://$1/procurement/orders"

With this configuration, changes to tenant1.orders should go to persistent://tenant1/procurement/orders, and tenant2.orders to persistent://tenant2/procurement/orders.

However, the current implementation prepends persistent://global/procurement/ to the already fully-qualified topic, resulting in invalid topic names like persistent://global/procurement/persistent://tenant1/procurement/orders. This causes runtime exceptions and connector failure loops.

By supporting fully-qualified topic names, this change enables more flexible and tenant-aware architectures without requiring additional processing layers.

Modifications

This change adjusts AbstractKafkaConnectSource to correctly handle fully-qualified topic names by using the org.apache.pulsar.common.naming.TopicName utility. If the topic name is valid and fully-qualified, it is respected as-is. Otherwise, the adaptor falls back to the existing behavior, ensuring backward compatibility.

Verifying this change

  • Make sure that the change passes the CI checks.

This change added tests and can be verified as follows:

  • Extended the existing KafkaConnectSourceTest test suite with a couple of tests to confirm Kafka Connect Adaptor can handle short and fully-qualified topic names

Documentation

  • doc
  • doc-required
  • doc-not-needed
  • doc-complete

@github-actions github-actions bot added the doc-not-needed Your PR changes do not impact docs label May 5, 2025
@efcasado
Copy link
Contributor Author

efcasado commented May 5, 2025

@lhotari, @AlvaroStream - I hope you don't mind me tagging you in this pull request. I took the liberty to do this because of the discussions we had around #24221 😊

@lhotari
Copy link
Member

lhotari commented May 5, 2025

@lhotari, @AlvaroStream - I hope you don't mind me tagging you in this pull request. I took the liberty to do this because of the discussions we had around #24221 😊

change LGTM, @efcasado. Thanks for continuing the contributions!
Do you have plans to add some basic tests for the change?

@efcasado
Copy link
Contributor Author

efcasado commented May 5, 2025

Do you have plans to add some basic tests for the change?

Sure thing.

@efcasado efcasado force-pushed the kca-fq-destination-topic-names branch 2 times, most recently from 5b35aaa to 6afa94c Compare May 5, 2025 19:07
@efcasado efcasado force-pushed the kca-fq-destination-topic-names branch from 6afa94c to fa9a291 Compare May 5, 2025 19:43
@efcasado
Copy link
Contributor Author

efcasado commented May 5, 2025

Done. I added a couple of simple tests (ie. short and fully-qualified topic names). Passing build can be found here.

Copy link
Member

@lhotari lhotari left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Great stuff! LGTM

Copy link
Contributor

@liangyepianzhou liangyepianzhou left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

LGTM

@lhotari lhotari merged commit de1d4c9 into apache:master May 7, 2025
59 of 61 checks passed
lhotari pushed a commit that referenced this pull request May 9, 2025
lhotari pushed a commit that referenced this pull request May 9, 2025
lhotari pushed a commit that referenced this pull request May 9, 2025
manas-ctds pushed a commit to datastax/pulsar that referenced this pull request May 14, 2025
…rds (apache#24248)

(cherry picked from commit de1d4c9)
(cherry picked from commit fd7bfac)
manas-ctds pushed a commit to datastax/pulsar that referenced this pull request May 14, 2025
…rds (apache#24248)

(cherry picked from commit de1d4c9)
(cherry picked from commit 2cb88c6)
manas-ctds pushed a commit to datastax/pulsar that referenced this pull request May 14, 2025
…rds (apache#24248)

(cherry picked from commit de1d4c9)
(cherry picked from commit fd7bfac)
manas-ctds pushed a commit to datastax/pulsar that referenced this pull request May 14, 2025
…rds (apache#24248)

(cherry picked from commit de1d4c9)
(cherry picked from commit 2cb88c6)
manas-ctds pushed a commit to datastax/pulsar that referenced this pull request May 15, 2025
…rds (apache#24248)

(cherry picked from commit de1d4c9)
(cherry picked from commit 2cb88c6)
manas-ctds pushed a commit to datastax/pulsar that referenced this pull request May 15, 2025
…rds (apache#24248)

(cherry picked from commit de1d4c9)
(cherry picked from commit 2cb88c6)
srinath-ctds pushed a commit to datastax/pulsar that referenced this pull request May 16, 2025
…rds (apache#24248)

(cherry picked from commit de1d4c9)
(cherry picked from commit 2cb88c6)
srinath-ctds pushed a commit to datastax/pulsar that referenced this pull request May 18, 2025
…rds (apache#24248)

(cherry picked from commit de1d4c9)
(cherry picked from commit fd7bfac)
nodece pushed a commit to ascentstream/pulsar that referenced this pull request May 28, 2025
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Projects
None yet
Development

Successfully merging this pull request may close these issues.

3 participants