-
Notifications
You must be signed in to change notification settings - Fork 3.6k
[improve][io][kca] support fully-qualified topic names in source records #24248
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
[improve][io][kca] support fully-qualified topic names in source records #24248
Conversation
@lhotari, @AlvaroStream - I hope you don't mind me tagging you in this pull request. I took the liberty to do this because of the discussions we had around #24221 😊 |
change LGTM, @efcasado. Thanks for continuing the contributions! |
Sure thing. |
5b35aaa
to
6afa94c
Compare
6afa94c
to
fa9a291
Compare
Done. I added a couple of simple tests (ie. short and fully-qualified topic names). Passing build can be found here. |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Great stuff! LGTM
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
LGTM
…rds (apache#24248) (cherry picked from commit de1d4c9) (cherry picked from commit fd7bfac)
…rds (apache#24248) (cherry picked from commit de1d4c9) (cherry picked from commit 2cb88c6)
…rds (apache#24248) (cherry picked from commit de1d4c9) (cherry picked from commit fd7bfac)
…rds (apache#24248) (cherry picked from commit de1d4c9) (cherry picked from commit 2cb88c6)
…rds (apache#24248) (cherry picked from commit de1d4c9) (cherry picked from commit 2cb88c6)
…rds (apache#24248) (cherry picked from commit de1d4c9) (cherry picked from commit 2cb88c6)
…rds (apache#24248) (cherry picked from commit de1d4c9) (cherry picked from commit 2cb88c6)
…rds (apache#24248) (cherry picked from commit de1d4c9) (cherry picked from commit fd7bfac)
…rds (apache#24248) (cherry picked from commit de1d4c9)
Motivation
This change builds on the work previously done in #24221.
The current implementation of the Kafka Connect adaptor in Pulsar IO does not support fully-qualified Pulsar topic names in source records. Instead, it forcefully prepends the default
persistent://<tenant>/<namespace>/
prefix to all destination topics. This behavior can be problematic in multi-tenant environments where dynamic topic routing is required.For example, consider a setup with:
tenant1
,tenant2
)tenant1
,tenant2
)global
(shared) tenantUsing Kafka Connect transformations, users may want to route records to tenant-specific topics based on the PostgreSQL schema:
With this configuration, changes to
tenant1.orders
should go topersistent://tenant1/procurement/orders
, andtenant2.orders
topersistent://tenant2/procurement/orders
.However, the current implementation prepends
persistent://global/procurement/
to the already fully-qualified topic, resulting in invalid topic names likepersistent://global/procurement/persistent://tenant1/procurement/orders
. This causes runtime exceptions and connector failure loops.By supporting fully-qualified topic names, this change enables more flexible and tenant-aware architectures without requiring additional processing layers.
Modifications
This change adjusts
AbstractKafkaConnectSource
to correctly handle fully-qualified topic names by using theorg.apache.pulsar.common.naming.TopicName
utility. If the topic name is valid and fully-qualified, it is respected as-is. Otherwise, the adaptor falls back to the existing behavior, ensuring backward compatibility.Verifying this change
This change added tests and can be verified as follows:
KafkaConnectSourceTest
test suite with a couple of tests to confirm Kafka Connect Adaptor can handle short and fully-qualified topic namesDocumentation
doc
doc-required
doc-not-needed
doc-complete