Skip to content

Support CDC source from log compacted kafka topic #9267

New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Closed
hzxa21 opened this issue Apr 19, 2023 · 6 comments
Closed

Support CDC source from log compacted kafka topic #9267

hzxa21 opened this issue Apr 19, 2023 · 6 comments
Assignees
Labels
Milestone

Comments

@hzxa21
Copy link
Collaborator

hzxa21 commented Apr 19, 2023

Is your feature request related to a problem? Please describe.

Many CDC source connectors are designed to work with log compacted topic in kafka. Examples: PG, MongoDB, MySQL)

In this case, the old record in the CDC message can be wrong or missing and we may not handle it well in the current implementation.

In the current design, CDC source must be materiliazed and defined with primary key constraints. When there is a PK conflict, the current behavior is to overwrite. Therefore, the implementation enfoces UPSERT semantic on all CDC sources, in which case the old record in the CDC message is unused.

Describe the solution you'd like

Given that we enforce the following properties for CDC source:

  • Must be fully materilized
  • Must have primiary key
  • Overwrite based on the latest record on PK conflicts

We can unify the processing of CDC source w/ and w/o log compaction by ignoring the old value in the CDC message without breaking compactibility. Also we should explicitly assert on the above properties in the implementation. If there is any change in the above properties, it is a new behvior anyway and requires further discussion.

Describe alternatives you've considered

No response

Additional context

No response

@hzxa21 hzxa21 added the type/feature Type: New feature. label Apr 19, 2023
@github-actions github-actions bot added this to the release-0.19 milestone Apr 19, 2023
@StrikeW
Copy link
Contributor

StrikeW commented Apr 20, 2023

In the current design, CDC source must be materiliazed and defined with primary key constraints. When there is a PK conflict, the current behavior is to overwrite. Therefore, the implementation enfoces UPSERT semantic on all CDC sources, in which case the old record in the CDC message is unused.

IIUC, current implementation doesn't distinguish between CDC source w/ or w/o log compaction. So we may need to just ensure the three properties you mentioned above?

@hzxa21
Copy link
Collaborator Author

hzxa21 commented May 11, 2023

We can unify the processing of CDC source w/ and w/o log compaction by ignoring the old value in the CDC message without breaking compactibility.

To be more specific, there will be two major changes in all CDC format parser:

  1. The old value in the CDC message with UPDATE op won't be read and the parser emit an INSERT instead of UPDATE message to the downstream.
  2. Since the old value in the CDC message with DELETE op can be missing due the log compaction, we need to read the kafka message key to extract PK and emit a DELETE message with PK to downstream, instead of pasing the kafka message value.

@hzxa21
Copy link
Collaborator Author

hzxa21 commented May 24, 2023

Related PR: #9944, #9386

I think we can unify the handling of kafka source with debezium format:

  • Always read message key from kafka and include it in the payload passed to parser (refer to feat: support message key as a column in avro_upsert #9386)
  • Do not differentiate debezium and upsert debezium (i.e. we won't have an option to mark the upsert property).
    • If message value == null, it must be a tombstone message. Emit DELETE to downstream using message key as the DELETE row. Throw an error if message key is empty.
    • If message value != null, it must c ontain a valid message value in debezium format.
      • If debezium op == CREATE, emit INSERT to downstream using the after field in the debezium value as the INSERT row.
      • If debezium op == UPDATE, emit INSERT to downstream using the after field in the debezium value as the INSERT row.
      • If debezium op == DELETE, emit DELETE to downstream using the before field as the DELETE row.

Summary

  • Valid combination
Kafka Message Key Kafka Message Value Debezium OP Emit-to-downstream OP Emit-to-downstream Data
not null null - DELETE kafka key
not null / null not null CREATE / UPDATE INSERT debezium after
not null / null not null DELETE DELETE debezium before
  • Invalid combination
Kafka Message Key Kafka Message Value Debezium Payload
null null -
not null / null not null op=CREATE / UPDATE && after=null
not null / null not null op=DELETE && before=null

cc @tabVersion @KveinAxel @idx0-dev

One caveat here is that DELETE row emitted to downstream can contain a full row (MySQL, PG if REPLICA IDENTITY == FULL) or only the PK part of the row (PG if REPLICA IDENTITY != FULL). Although in theory downstream materizlied executor only needs the PK for DELETE with ConlictBehaviour::Overwrite, I am not sure whether currently we can handle that cc @st1page

@st1page
Copy link
Contributor

st1page commented May 24, 2023

One caveat here is that DELETE row emitted to downstream can contain a full row (MySQL, PG if REPLICA IDENTITY == FULL) or only the PK part of the row (PG if REPLICA IDENTITY != FULL). Although in theory downstream materizlied executor only needs the PK for DELETE with ConlictBehaviour::Overwrite, I am not sure whether currently we can handle that cc @st1page

In short, the materizlied executor with ConlictBehaviour::Overwrite does not depend on any old value (update/delete before) and just treat the input stream as a upsert stream

@neverchanje
Copy link
Contributor

Can we close this issue now?

@hzxa21
Copy link
Collaborator Author

hzxa21 commented Jun 21, 2023

Can we close this issue now?

Yes, it is done by #9944

Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Labels
Projects
None yet
Development

No branches or pull requests

6 participants