-
Notifications
You must be signed in to change notification settings - Fork 4.5k
Bigquery / Bulk Load CDK: implement T+D, truncate refresh, etc #57578
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
Bigquery / Bulk Load CDK: implement T+D, truncate refresh, etc #57578
Conversation
The latest updates on your projects. Learn more about Vercel for Git ↗︎
|
9d266d5
to
8acd8fb
Compare
f7306f6
to
d9fe774
Compare
bfef4d8
to
e2e04af
Compare
5e17a90
to
27e0aec
Compare
e2e04af
to
47b9096
Compare
27e0aec
to
a72b07f
Compare
765cf5b
to
29483a6
Compare
a72b07f
to
ff01262
Compare
29483a6
to
73e6ae3
Compare
* | ||
* Callers are encouraged to use the static factory methods instead of the public constructor. | ||
*/ | ||
data class Sql(val transactions: List<List<String>>) { |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
this is a copy of the existing Sql class
|
||
package io.airbyte.cdk.load.orchestration | ||
|
||
interface DestinationHandler { |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
this is similar to the existing DestinationHandler interface, but trimmed down. The "initial status" stuff has ben moved to DestinationInitialStatusGatherer.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
What a name
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Oh, this refers to the Destination not the Destination. I was confused
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
This is really a DatabaseDestinationHandler?
also FYI the other db stuff has db in the namespace (cdk.load.db.orchestration)
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Or StructuredDataStoreHandler
if you want to get really pedantic, since it will also cover warehouses.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I'll go with "database", I feel like context is sufficiently clear that it also refers to warehouses :P
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
renamed in (as yet unpushed) bafeb80
|
||
import io.airbyte.cdk.load.orchestration.DestinationInitialStatus | ||
import java.time.Instant | ||
|
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
this is mostly identical to the old DestinationInitialStatus class, but cleaned up for kotlin style.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Some nits / tweaks but I think this looks good
object TypingDedupingUtil { | ||
// copied wholesale from old CDK's StreamId | ||
fun concatenateRawTableName(namespace: String, name: String): String { | ||
val plainConcat = namespace + name |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Thoughts about this:
val plain = namespace + name
// Find all underscore‐runs, get the max length (or 1 if none)
val longestRun = Regex("_+")
.findAll(plain)
.map { it.value.length }
.maxOrNull()
.let { max(it ?: 0, 1) }
// Build: namespace + "_raw" + "_" * (longestRun + 1) + "stream_" + name
val underscores = "_".repeat(longestRun + 1)
return buildString {
append(namespace)
append("_raw")
append(underscores)
append("stream_")
append(name)
}
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
done in 7477f85 (I used a string template instead of buildstring, IMO it's easier to read)
...main/kotlin/io/airbyte/cdk/load/orchestration/legacy_typing_deduping/TypingDedupingWriter.kt
Outdated
Show resolved
Hide resolved
* | ||
* (I have no explanation for the method names.) | ||
*/ | ||
class BigQuerySQLNameTransformer { |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Should we mark this as deprecated?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
mm. I think what we should do is actually rename it + the methods for clarity. Not as part of this PR though, the diff would be really annoying
"""{ | ||
"format_type": "CSV", | ||
"flattening": "No flattening" | ||
}""" |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Should we have a .trimIndent()
here?
.../kotlin/io/airbyte/integrations/destination/bigquery/typing_deduping/BigQuerySqlGenerator.kt
Show resolved
Hide resolved
...tlin/io/airbyte/integrations/destination/bigquery/typing_deduping/BigQueryDatabaseHandler.kt
Show resolved
Hide resolved
...tlin/io/airbyte/integrations/destination/bigquery/typing_deduping/BigQueryDatabaseHandler.kt
Show resolved
Hide resolved
...tlin/io/airbyte/integrations/destination/bigquery/typing_deduping/BigQueryDatabaseHandler.kt
Outdated
Show resolved
Hide resolved
...tlin/io/airbyte/integrations/destination/bigquery/typing_deduping/BigQueryDatabaseHandler.kt
Outdated
Show resolved
Hide resolved
...e/integrations/destination/bigquery/typing_deduping/BigqueryDatabaseInitialStatusGatherer.kt
Show resolved
Hide resolved
...e/integrations/destination/bigquery/typing_deduping/BigqueryDatabaseInitialStatusGatherer.kt
Outdated
Show resolved
Hide resolved
# Conflicts: # airbyte-cdk/bulk/core/load/src/main/kotlin/io/airbyte/cdk/load/message/DestinationMessage.kt # airbyte-integrations/connectors/destination-bigquery/src/main/kotlin/io/airbyte/integrations/destination/bigquery/formatter/BigQueryRecordFormatter.kt # airbyte-integrations/connectors/destination-bigquery/src/main/kotlin/io/airbyte/integrations/destination/bigquery/typing_deduping/BigQuerySqlGenerator.kt # airbyte-integrations/connectors/destination-bigquery/src/test-integration/kotlin/io/airbyte/integrations/destination/bigquery/BigQueryDestinationTestUtils.kt # airbyte-integrations/connectors/destination-bigquery/src/test-integration/kotlin/io/airbyte/integrations/destination/bigquery/BigqueryWriteTest.kt # airbyte-integrations/connectors/destination-bigquery/src/test/kotlin/io/airbyte/integrations/destination/bigquery/typing_deduping/BigqueryDestinationHandlerTest.kt
# Conflicts: # airbyte-cdk/bulk/core/load/src/main/kotlin/io/airbyte/cdk/load/message/DestinationMessage.kt # airbyte-integrations/connectors/destination-bigquery/src/main/kotlin/io/airbyte/integrations/destination/bigquery/formatter/BigQueryRecordFormatter.kt # airbyte-integrations/connectors/destination-bigquery/src/main/kotlin/io/airbyte/integrations/destination/bigquery/typing_deduping/BigQuerySqlGenerator.kt # airbyte-integrations/connectors/destination-bigquery/src/test-integration/kotlin/io/airbyte/integrations/destination/bigquery/BigQueryDestinationTestUtils.kt # airbyte-integrations/connectors/destination-bigquery/src/test-integration/kotlin/io/airbyte/integrations/destination/bigquery/BigqueryWriteTest.kt # airbyte-integrations/connectors/destination-bigquery/src/test/kotlin/io/airbyte/integrations/destination/bigquery/typing_deduping/BigqueryDestinationHandlerTest.kt
# Conflicts: # airbyte-cdk/bulk/core/load/src/main/kotlin/io/airbyte/cdk/load/message/DestinationMessage.kt # airbyte-integrations/connectors/destination-bigquery/src/main/kotlin/io/airbyte/integrations/destination/bigquery/formatter/BigQueryRecordFormatter.kt # airbyte-integrations/connectors/destination-bigquery/src/main/kotlin/io/airbyte/integrations/destination/bigquery/typing_deduping/BigQuerySqlGenerator.kt # airbyte-integrations/connectors/destination-bigquery/src/test-integration/kotlin/io/airbyte/integrations/destination/bigquery/BigQueryDestinationTestUtils.kt # airbyte-integrations/connectors/destination-bigquery/src/test-integration/kotlin/io/airbyte/integrations/destination/bigquery/BigqueryWriteTest.kt # airbyte-integrations/connectors/destination-bigquery/src/test/kotlin/io/airbyte/integrations/destination/bigquery/typing_deduping/BigqueryDestinationHandlerTest.kt
in very broad strokes:
toolkits/load-db
- the actual core logicBigqueryWriter
.