Skip to content

Support incremental dedupe in databricks destination connector #6042

New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Closed
tuliren opened this issue Sep 14, 2021 · 14 comments
Closed

Support incremental dedupe in databricks destination connector #6042

tuliren opened this issue Sep 14, 2021 · 14 comments
Assignees

Comments

@tuliren
Copy link
Contributor

tuliren commented Sep 14, 2021

Tell us about the problem you're trying to solve

Currently the databricks destination connector only supports the incremental append mode. We want it to support the incremental dedup mode as well.

This relates to #2075, and is a follow-up issue from PR #5998.

Describe the solution you’d like

Use the MERGE command instead of the COPY INTO command to insert new records.

Reference:

Copy Into SQL command VS Merge SQL command

  • The Copy Into SQL command is designed to support high performence inserts (new data) into Databricks Delta. It does not support Updates and Deletes
  • Merge SQL command supports inserts updates and deletes. It is slower that the Copy Into command for inserting data into Delta.

Describe the alternative you’ve considered or used

N/A

Additional context

N/A

Are you willing to submit a PR?

Yes.

@tuliren tuliren added the type/enhancement New feature or request label Sep 14, 2021
@tuliren tuliren self-assigned this Sep 14, 2021
@sherifnada sherifnada added the area/connectors Connector related issues label Oct 14, 2021
@sherifnada sherifnada moved this to Backlog in GL Roadmap Jan 12, 2022
@shrodingers
Copy link
Contributor

Just wanted to post here to follow the thread, and to know if i could help on this matter since i would really be interested on deduped and dbt transformations with databricks

@tuliren
Copy link
Contributor Author

tuliren commented Jan 14, 2022

@shrodingers, thanks for your comment. We probably won't have bandwidth to implement the incremental dedup mode for Databricks this quarter. So feel free to try it if you are interested.

As mentioned in the description, I think the MERGE command may be useful in the implementation. But we have not thoroughly looked into it yet.

@igrankova igrankova moved this from Backlog to Backlog (unscoped) in GL Roadmap Feb 2, 2022
@shrodingers
Copy link
Contributor

shrodingers commented Apr 15, 2022

@tuliren Coming back on this one, i started to implement normalization on the databricks connector (in order to handle append_deduped and basic normalisation).
Things were going pretty nicely on my tests, until the point where i realized that the connector used the catalog schema to generate the tables, using the S3 parquet writer that normalize the first level of columns in the raw data, thus bypassing entierely the _airbyte_data JSON blob syncing. Or all the normalization code uses the _airbyte_data in order to normalize and then dedup the data.
I was wondering what would be the correct way to handle this ?

  • Change the file format for staging files from parquet to another one without schema ? [Breaking for users]
  • Create a new raw parquet writer for S3 ? [Also Breaking]
  • Allow users to choose between json2avro or dbt normalization ? (but may be quite weird i think)
  • Write the blob alongside the first level of rows (this conflicts a bit with the json2avro blob existing for S3 destinations) ?
  • create a different connector, one that supports normalization through raw data S3 syncing and DBT transformation, and the other that does not but produces tables normalized on the first level ?

Would be glad to have an input on this, since i am not sure what is the most useful and simple way

@shrodingers
Copy link
Contributor

Managed to have a custom implementation for this connector to support DBT, extending the S3StreamCopier directly, and with a different flow than the current implementation.
Still have some tiny issues (dangling threads and ignored python errors during normalization, as well as a full refresh issue on first run)
I'll be glad to submit a PR, but i think maybe i should allow to switch between the current implementation and the one compatible with DBT transformations, since both are really different, and incompatible (we need to keep the airbyte_data as a json blob to do normalization correctly). Anyways i'll be happy to help, and / or to have some insight on the more airbyte-ish way to do stuff :)

@grishick grishick removed this from GL Roadmap Apr 28, 2022
@grishick grishick added the team/destinations Destinations team's backlog label Sep 27, 2022
@tuliren tuliren removed their assignment Oct 27, 2022
@etsybaev etsybaev self-assigned this Feb 8, 2023
@etsybaev
Copy link
Contributor

etsybaev commented Feb 8, 2023

It seems like we already have some contributor's PR, but it's huge!
#14445

@shrodingers
Copy link
Contributor

shrodingers commented Feb 8, 2023

@etsybaev this pr is deprecated by now following some changes on the databricks / S3 connectors that have been happening since then, as well as other issues i saw arising by using it on production. And I think there is some code that have nothing to with the feature (may have failed my branch diff and added another source connector + ui changes by mistake), plus lots of code is generated test suite for the transformation acceptance tests. Kept it open in order to have a reference on some parts i managed to test on the normalization models / macros and connector updates. But i'll be glad to work on the subject if i have some insights on certain points, and resubmit a cleaner / updated pull request later :)

@grishick
Copy link
Contributor

grishick commented Feb 9, 2023

blocked by re-rollout of v1 normalization

@marijncv
Copy link

Hey @grishick, is this issue still blocked? Is it a matter of swapping the COPY INTO statement for MERGE INTO like the OP suggested or is there something else that makes the implementation difficult?

Considering to start using Airbyte and having incremental dedupe for the Databricks source would be a great pro!

@shrodingers
Copy link
Contributor

Hello @marijncv, made a custom connector forked from the official one in order to achieve inc dedupe + normalization (with the option to only normalize first level in order to avoid having too many tables in case of deeply nested data). Will eventually submit a pr some time, but in the meantime if you're interested and using self-hosted airbyte I would be able to share this with you

@marijncv
Copy link

Hey @shrodingers, that sounds great! Would love to try it out

@asilvis
Copy link

asilvis commented Aug 4, 2023

@shrodingers can you open this PR and/or point me to your implementation as well? I am very interested in evaluating it.

@shrodingers
Copy link
Contributor

Hi @marijncv @asilvis , just went back from vacations, will do as soon as possible !

shrodingers added a commit to Brigad/airbyte that referenced this issue Aug 17, 2023
@shrodingers
Copy link
Contributor

Just submitted the pr #29510 that can be reviewed, with the code to enable incremental dedup (may need a hand for finalization / testing or edge-cases i didn't spot)

@evantahler
Copy link
Contributor

Closing this issue to not be misleading - With Destinations V2 (#26028) no destinations should gain "normalization" via dbt.

Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Projects
None yet
Development

No branches or pull requests

10 participants