Skip to content

Bigquery / Bulk Load CDK: implement T+D, truncate refresh, etc #57578

New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Merged
merged 55 commits into from
Apr 24, 2025

Conversation

edgao
Copy link
Contributor

@edgao edgao commented Apr 11, 2025

in very broad strokes:

  • CollectionUtils, ConnectorExceptionUtils - just copying code from the old CDK to the bulk CDK
  • AirbyteType - adding some utility methods to make our life easier
  • toolkits/load-db - the actual core logic
    • there are a few classes, which represent fundamental functionality
    • and subpackages for different load styles:
      • direct_load_table - this is the skeleton for how we'll implement direct-load tables
      • legacy_typing_deduping - a full implementation of T+D
    • notably: the intended usage is for the destination to build a Factory, which returns the appropriate Writer. See e.g. BigqueryWriter.
  • and then there's just a ton of implementation code in destination-bigquery

Copy link

vercel bot commented Apr 11, 2025

The latest updates on your projects. Learn more about Vercel for Git ↗︎

Name Status Preview Comments Updated (UTC)
airbyte-docs ✅ Ready (Inspect) Visit Preview 💬 Add feedback Apr 24, 2025 4:42pm

@edgao edgao changed the base branch from master to destination_bigquery_bulk_cdk April 11, 2025 16:22
@edgao edgao force-pushed the destination_bigquery_bulk_cdk branch from 9d266d5 to 8acd8fb Compare April 14, 2025 20:02
@edgao edgao force-pushed the edgao/bigquery_call_old_code branch from f7306f6 to d9fe774 Compare April 14, 2025 20:02
@edgao edgao changed the title Edgao/bigquery call old code Edgao/bigquery port interfaces Apr 14, 2025
@edgao edgao force-pushed the edgao/bigquery_call_old_code branch from bfef4d8 to e2e04af Compare April 16, 2025 23:06
@edgao edgao force-pushed the destination_bigquery_bulk_cdk branch from 5e17a90 to 27e0aec Compare April 16, 2025 23:26
@edgao edgao force-pushed the edgao/bigquery_call_old_code branch from e2e04af to 47b9096 Compare April 16, 2025 23:26
@edgao edgao force-pushed the destination_bigquery_bulk_cdk branch from 27e0aec to a72b07f Compare April 16, 2025 23:54
@edgao edgao force-pushed the edgao/bigquery_call_old_code branch from 765cf5b to 29483a6 Compare April 16, 2025 23:56
@edgao edgao force-pushed the destination_bigquery_bulk_cdk branch from a72b07f to ff01262 Compare April 17, 2025 15:13
@edgao edgao force-pushed the edgao/bigquery_call_old_code branch from 29483a6 to 73e6ae3 Compare April 17, 2025 16:43
*
* Callers are encouraged to use the static factory methods instead of the public constructor.
*/
data class Sql(val transactions: List<List<String>>) {
Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

this is a copy of the existing Sql class


package io.airbyte.cdk.load.orchestration

interface DestinationHandler {
Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

this is similar to the existing DestinationHandler interface, but trimmed down. The "initial status" stuff has ben moved to DestinationInitialStatusGatherer.

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

What a name

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Oh, this refers to the Destination not the Destination. I was confused

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This is really a DatabaseDestinationHandler?

also FYI the other db stuff has db in the namespace (cdk.load.db.orchestration)

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Or StructuredDataStoreHandler if you want to get really pedantic, since it will also cover warehouses.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I'll go with "database", I feel like context is sufficiently clear that it also refers to warehouses :P

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

renamed in (as yet unpushed) bafeb80


import io.airbyte.cdk.load.orchestration.DestinationInitialStatus
import java.time.Instant

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

this is mostly identical to the old DestinationInitialStatus class, but cleaned up for kotlin style.

Copy link
Contributor

@frifriSF59 frifriSF59 left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Some nits / tweaks but I think this looks good

object TypingDedupingUtil {
// copied wholesale from old CDK's StreamId
fun concatenateRawTableName(namespace: String, name: String): String {
val plainConcat = namespace + name
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Thoughts about this:

        val plain = namespace + name
        // Find all underscore‐runs, get the max length (or 1 if none)
        val longestRun = Regex("_+")
            .findAll(plain)
            .map { it.value.length }
            .maxOrNull()
            .let { max(it ?: 0, 1) }

        // Build: namespace + "_raw" + "_" * (longestRun + 1) + "stream_" + name
        val underscores = "_".repeat(longestRun + 1)
        return buildString {
            append(namespace)
            append("_raw")
            append(underscores)
            append("stream_")
            append(name)
        }

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

done in 7477f85 (I used a string template instead of buildstring, IMO it's easier to read)

*
* (I have no explanation for the method names.)
*/
class BigQuerySQLNameTransformer {
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Should we mark this as deprecated?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

mm. I think what we should do is actually rename it + the methods for clarity. Not as part of this PR though, the diff would be really annoying

"""{
"format_type": "CSV",
"flattening": "No flattening"
}"""
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Should we have a .trimIndent() here?

@edgao edgao marked this pull request as ready for review April 24, 2025 16:38
@edgao edgao requested review from a team as code owners April 24, 2025 16:38
@edgao edgao merged commit dfbdfb6 into destination_bigquery_bulk_cdk Apr 24, 2025
14 of 18 checks passed
@edgao edgao deleted the edgao/bigquery_call_old_code branch April 24, 2025 16:38
frifriSF59 pushed a commit that referenced this pull request May 1, 2025
# Conflicts:
#	airbyte-cdk/bulk/core/load/src/main/kotlin/io/airbyte/cdk/load/message/DestinationMessage.kt
#	airbyte-integrations/connectors/destination-bigquery/src/main/kotlin/io/airbyte/integrations/destination/bigquery/formatter/BigQueryRecordFormatter.kt
#	airbyte-integrations/connectors/destination-bigquery/src/main/kotlin/io/airbyte/integrations/destination/bigquery/typing_deduping/BigQuerySqlGenerator.kt
#	airbyte-integrations/connectors/destination-bigquery/src/test-integration/kotlin/io/airbyte/integrations/destination/bigquery/BigQueryDestinationTestUtils.kt
#	airbyte-integrations/connectors/destination-bigquery/src/test-integration/kotlin/io/airbyte/integrations/destination/bigquery/BigqueryWriteTest.kt
#	airbyte-integrations/connectors/destination-bigquery/src/test/kotlin/io/airbyte/integrations/destination/bigquery/typing_deduping/BigqueryDestinationHandlerTest.kt
frifriSF59 pushed a commit that referenced this pull request May 2, 2025
# Conflicts:
#	airbyte-cdk/bulk/core/load/src/main/kotlin/io/airbyte/cdk/load/message/DestinationMessage.kt
#	airbyte-integrations/connectors/destination-bigquery/src/main/kotlin/io/airbyte/integrations/destination/bigquery/formatter/BigQueryRecordFormatter.kt
#	airbyte-integrations/connectors/destination-bigquery/src/main/kotlin/io/airbyte/integrations/destination/bigquery/typing_deduping/BigQuerySqlGenerator.kt
#	airbyte-integrations/connectors/destination-bigquery/src/test-integration/kotlin/io/airbyte/integrations/destination/bigquery/BigQueryDestinationTestUtils.kt
#	airbyte-integrations/connectors/destination-bigquery/src/test-integration/kotlin/io/airbyte/integrations/destination/bigquery/BigqueryWriteTest.kt
#	airbyte-integrations/connectors/destination-bigquery/src/test/kotlin/io/airbyte/integrations/destination/bigquery/typing_deduping/BigqueryDestinationHandlerTest.kt
frifriSF59 pushed a commit that referenced this pull request May 2, 2025
# Conflicts:
#	airbyte-cdk/bulk/core/load/src/main/kotlin/io/airbyte/cdk/load/message/DestinationMessage.kt
#	airbyte-integrations/connectors/destination-bigquery/src/main/kotlin/io/airbyte/integrations/destination/bigquery/formatter/BigQueryRecordFormatter.kt
#	airbyte-integrations/connectors/destination-bigquery/src/main/kotlin/io/airbyte/integrations/destination/bigquery/typing_deduping/BigQuerySqlGenerator.kt
#	airbyte-integrations/connectors/destination-bigquery/src/test-integration/kotlin/io/airbyte/integrations/destination/bigquery/BigQueryDestinationTestUtils.kt
#	airbyte-integrations/connectors/destination-bigquery/src/test-integration/kotlin/io/airbyte/integrations/destination/bigquery/BigqueryWriteTest.kt
#	airbyte-integrations/connectors/destination-bigquery/src/test/kotlin/io/airbyte/integrations/destination/bigquery/typing_deduping/BigqueryDestinationHandlerTest.kt
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Projects
None yet
Development

Successfully merging this pull request may close these issues.

4 participants