-
Notifications
You must be signed in to change notification settings - Fork 4.5k
Bigquery / Bulk Load CDK: implement T+D, truncate refresh, etc #57578
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
Changes from 35 commits
2ce873a
6e01e97
16c3cfd
0509cf7
80e1447
626fa83
7d718d0
e399c3c
c43d10b
b529a83
0457865
063c79d
c378068
7ffb0b8
4694dea
31da2f9
1f6f2c8
68b82ae
c89574e
09d2497
a852bef
9e4b482
b47374d
f90b2b3
9e1a737
b1c89f0
f5df055
b54cebd
acbedf0
a3fa044
e148eca
298d9bf
9cf1fb3
1692493
d8401ff
dddb3f5
24d32fa
bafeb80
590a904
ef252ef
b4268cf
97dce7a
02cc7cc
e2a1ea7
edfa43c
f9a7794
626a2f7
7477f85
3498570
22078b9
522bd19
c6c1f3a
b1bacbf
1db68c5
2f8e95e
File filter
Filter by extension
Conversations
Jump to
Diff view
Diff view
There are no files selected for viewing
Original file line number | Diff line number | Diff line change |
---|---|---|
@@ -0,0 +1,62 @@ | ||
/* | ||
* Copyright (c) 2025 Airbyte, Inc., all rights reserved. | ||
*/ | ||
|
||
package io.airbyte.cdk.util | ||
|
||
import java.util.Optional | ||
|
||
object CollectionUtils { | ||
/** | ||
* Pass in a collection and search term to determine whether any of the values match ignoring | ||
* case | ||
* | ||
* @param collection the collection of values | ||
* @param search the value to look for | ||
* @return whether the value matches anything in the collection | ||
*/ | ||
@JvmStatic | ||
fun containsIgnoreCase(collection: Collection<String>, search: String): Boolean { | ||
return matchingKey(collection, search).isPresent | ||
} | ||
|
||
/** | ||
* Convenience method for when you need to check an entire collection for membership in another | ||
* collection. | ||
* | ||
* @param searchCollection the collection you want to check membership in | ||
* @param searchTerms the keys you're looking for | ||
* @return whether all searchTerms are in the searchCollection | ||
*/ | ||
@JvmStatic | ||
fun containsAllIgnoreCase( | ||
searchCollection: Collection<String>, | ||
searchTerms: Collection<String> | ||
): Boolean { | ||
require(!searchTerms.isEmpty()) { | ||
// There isn't a good behavior for an empty collection. Without this check, an empty | ||
// collection | ||
// would always return | ||
// true, but it feels misleading to say that the searchCollection does "contain all" | ||
// when | ||
// searchTerms is empty | ||
"Search Terms collection may not be empty" | ||
} | ||
return searchTerms.all { term: String -> containsIgnoreCase(searchCollection, term) } | ||
} | ||
|
||
/** | ||
* From a collection of strings, return an entry which matches the search term ignoring case | ||
* | ||
* @param collection the collection to search | ||
* @param search the key you're looking for | ||
* @return an Optional value which might contain the key that matches the search | ||
*/ | ||
@JvmStatic | ||
fun matchingKey(collection: Collection<String>, search: String): Optional<String> { | ||
if (collection.contains(search)) { | ||
return Optional.of(search) | ||
} | ||
return Optional.ofNullable(collection.firstOrNull { it.equals(search, ignoreCase = true) }) | ||
} | ||
} |
Original file line number | Diff line number | Diff line change |
---|---|---|
@@ -0,0 +1,9 @@ | ||
/* | ||
* Copyright (c) 2025 Airbyte, Inc., all rights reserved. | ||
*/ | ||
|
||
package io.airbyte.cdk.util | ||
|
||
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. old CDK's ConnectorExceptionUtil had a ton of other stuff in it. I just copied in the thing we're using. |
||
object ConnectorExceptionUtil { | ||
val HTTP_AUTHENTICATION_ERROR_CODES: List<Int> = listOf(401, 403) | ||
} |
Original file line number | Diff line number | Diff line change |
---|---|---|
|
@@ -5,8 +5,25 @@ | |
package io.airbyte.cdk.load.data | ||
|
||
import com.fasterxml.jackson.databind.JsonNode | ||
import io.airbyte.cdk.load.util.Jsons | ||
import io.github.oshai.kotlinlogging.KotlinLogging | ||
|
||
private val logger = KotlinLogging.logger {} | ||
|
||
sealed interface AirbyteType { | ||
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. Should we use a sealed class instead of sealed interface since There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. done in yet-unpushed ef252ef |
||
/** | ||
* Utility method for database/warehouse destinations, which assume that the top-level schema is | ||
* an object. | ||
*/ | ||
fun asColumns(): LinkedHashMap<String, FieldType> { | ||
return linkedMapOf() | ||
} | ||
|
||
sealed interface AirbyteType | ||
val isObject: Boolean | ||
get() = false | ||
val isArray: Boolean | ||
get() = false | ||
} | ||
|
||
data object StringType : AirbyteType | ||
|
||
|
@@ -26,20 +43,91 @@ data object TimeTypeWithTimezone : AirbyteType | |
|
||
data object TimeTypeWithoutTimezone : AirbyteType | ||
|
||
data class ArrayType(val items: FieldType) : AirbyteType | ||
data class ArrayType(val items: FieldType) : AirbyteType { | ||
override val isArray = true | ||
} | ||
|
||
data object ArrayTypeWithoutSchema : AirbyteType { | ||
override val isArray = true | ||
} | ||
|
||
data object ArrayTypeWithoutSchema : AirbyteType | ||
data class ObjectType(val properties: LinkedHashMap<String, FieldType>) : AirbyteType { | ||
override fun asColumns(): LinkedHashMap<String, FieldType> { | ||
return properties | ||
} | ||
|
||
data class ObjectType(val properties: LinkedHashMap<String, FieldType>) : AirbyteType | ||
override val isObject = true | ||
} | ||
|
||
data object ObjectTypeWithEmptySchema : AirbyteType | ||
data object ObjectTypeWithEmptySchema : AirbyteType { | ||
override val isObject = true | ||
} | ||
|
||
data object ObjectTypeWithoutSchema : AirbyteType | ||
data object ObjectTypeWithoutSchema : AirbyteType { | ||
override val isObject = true | ||
} | ||
|
||
data class UnionType( | ||
val options: Set<AirbyteType>, | ||
val isLegacyUnion: Boolean, | ||
) : AirbyteType { | ||
/** | ||
* This is a hack to handle weird schemas like {type: [object, string]}. If a stream's top-level | ||
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. Ugh. One day we will storm the sources castle and slay this beast |
||
* schema looks like this, we still want to be able to extract the object properties (i.e. treat | ||
* it as though the string option didn't exist). | ||
* | ||
* @throws IllegalArgumentException if we cannot extract columns from this schema | ||
*/ | ||
override fun asColumns(): LinkedHashMap<String, FieldType> { | ||
logger.warn { "asColumns options=$options" } | ||
val numObjectOptions = options.count { it.isObject } | ||
if (numObjectOptions > 1) { | ||
logger.error { "Can't extract columns from a schema with multiple object options" } | ||
return LinkedHashMap() | ||
} | ||
|
||
var retVal: LinkedHashMap<String, FieldType> | ||
try { | ||
retVal = options.first { it.isObject }.asColumns() | ||
} catch (_: NoSuchElementException) { | ||
logger.error { "Can't extract columns from a schema with no object options" } | ||
retVal = LinkedHashMap() | ||
} | ||
logger.warn { "Union.asColumns retVal=$retVal" } | ||
return retVal | ||
} | ||
|
||
/** | ||
* This matches legacy behavior. Some destinations handle legacy unions by choosing the "best" | ||
* type from amongst the options. This is... not great, but it would be painful to change. | ||
*/ | ||
fun chooseType(): AirbyteType { | ||
check(isLegacyUnion) { "Cannot chooseType for a non-legacy union type" } | ||
if (options.isEmpty()) { | ||
return UnknownType(Jsons.createObjectNode()) | ||
} | ||
return options.minBy { | ||
when (it) { | ||
is ArrayType, | ||
ArrayTypeWithoutSchema -> -2 | ||
is ObjectType, | ||
ObjectTypeWithEmptySchema, | ||
ObjectTypeWithoutSchema -> -1 | ||
StringType -> 0 | ||
DateType -> 1 | ||
TimeTypeWithoutTimezone -> 2 | ||
TimeTypeWithTimezone -> 3 | ||
TimestampTypeWithoutTimezone -> 4 | ||
TimestampTypeWithTimezone -> 5 | ||
NumberType -> 6 | ||
IntegerType -> 7 | ||
BooleanType -> 8 | ||
is UnknownType -> 9 | ||
is UnionType -> Int.MAX_VALUE | ||
} | ||
} | ||
} | ||
|
||
companion object { | ||
fun of(options: Set<AirbyteType>, isLegacyUnion: Boolean = false): AirbyteType { | ||
if (options.size == 1) { | ||
|
Original file line number | Diff line number | Diff line change |
---|---|---|
@@ -0,0 +1,17 @@ | ||
/* | ||
* Copyright (c) 2025 Airbyte, Inc., all rights reserved. | ||
*/ | ||
|
||
package io.airbyte.cdk.load.orchestration | ||
|
||
interface DestinationHandler { | ||
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. this is similar to the existing DestinationHandler interface, but trimmed down. The "initial status" stuff has ben moved to DestinationInitialStatusGatherer. There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. What a name There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. Oh, this refers to the Destination not the Destination. I was confused There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. This is really a DatabaseDestinationHandler? also FYI the other db stuff has db in the namespace (cdk.load.db.orchestration) There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. Or There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. I'll go with "database", I feel like context is sufficiently clear that it also refers to warehouses :P There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. renamed in (as yet unpushed) bafeb80 |
||
fun execute(sql: Sql) | ||
|
||
/** | ||
* Create the namespaces (typically something like `create schema`). | ||
* | ||
* This function should assume that all `namespaces` are valid identifiers, i.e. any special | ||
* characters have already been escaped, they respect identifier name length, etc. | ||
*/ | ||
suspend fun createNamespaces(namespaces: Collection<String>) | ||
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. In practice there's never more than one namespace though? There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. there are :/
|
||
} |
Original file line number | Diff line number | Diff line change |
---|---|---|
@@ -0,0 +1,29 @@ | ||
/* | ||
* Copyright (c) 2025 Airbyte, Inc., all rights reserved. | ||
*/ | ||
|
||
package io.airbyte.cdk.load.orchestration | ||
|
||
import io.airbyte.cdk.load.command.DestinationStream | ||
|
||
interface DestinationInitialStatus | ||
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. again There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. done in yet-unpushed 02cc7cc |
||
|
||
/** | ||
* Some destinations can efficiently fetch multiple tables' information in a single query, so this | ||
* interface accepts multiple streams in a single method call. | ||
* | ||
* For destinations which do not support that optimization, a simpler implementation would be | ||
* something like this: | ||
* ```kotlin | ||
* streams.forEach { (stream, (tableNames, columnNames)) -> | ||
* launch { | ||
* // ... gather state... | ||
* } | ||
* } | ||
* ``` | ||
*/ | ||
fun interface DestinationInitialStatusGatherer<InitialStatus : DestinationInitialStatus> { | ||
suspend fun gatherInitialStatus( | ||
streams: Map<DestinationStream, Pair<TableNames, ColumnNameMapping>>, | ||
): Map<DestinationStream, InitialStatus> | ||
} |
Original file line number | Diff line number | Diff line change |
---|---|---|
@@ -0,0 +1,82 @@ | ||
/* | ||
* Copyright (c) 2025 Airbyte, Inc., all rights reserved. | ||
*/ | ||
|
||
package io.airbyte.cdk.load.orchestration | ||
|
||
import io.airbyte.cdk.load.command.DestinationStream | ||
|
||
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. this file supersedes the StreamConfig / StreamId / ColumnId stuff that the old CDK used. Mostly b/c the bulk CDK's DestinationStream is a clear replacement for StreamConfig. It's a bit less ergonomic, in that instead of passing a single StreamConfig around, we have to pass around tuples of DestinationStream x TableName(s) x ColumnNameMapping. But I kind of like the explicitness, that every method now only accepts exactly what it needs. |
||
data class TableNames( | ||
// this is pretty dumb, but in theory we could have: | ||
// * old-style implementation: raw+final tables both exist | ||
// * only the raw table exists (i.e. T+D disabled) | ||
// * only the final table exists (i.e. new-style direct-load tables) | ||
val rawTableName: TableName?, | ||
edgao marked this conversation as resolved.
Show resolved
Hide resolved
|
||
val finalTableName: TableName?, | ||
) { | ||
init { | ||
check(rawTableName != null || finalTableName != null) { | ||
"At least one table name should be nonnull" | ||
} | ||
} | ||
|
||
fun conflictsWith(other: TableNames) = | ||
this.rawTableName.conflictsWith(other.rawTableName) || | ||
this.finalTableName.conflictsWith(other.finalTableName) | ||
|
||
fun prettyPrint() = | ||
"Raw table: ${rawTableName?.prettyPrint()}; Final table: ${finalTableName?.prettyPrint()}" | ||
|
||
companion object { | ||
const val NO_SUFFIX = "" | ||
// TODO comment explaining this | ||
const val TMP_TABLE_SUFFIX = "_airbyte_tmp" | ||
const val SOFT_RESET_SUFFIX = "_ab_soft_reset" | ||
} | ||
} | ||
|
||
data class TableName(val namespace: String, val name: String) { | ||
fun prettyPrint(quote: String = "", suffix: String = "") = | ||
"$quote$namespace$quote.$quote$name$suffix$quote" | ||
} | ||
|
||
fun TableName?.conflictsWith(other: TableName?): Boolean { | ||
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. I think this should mention the fact that this more specifically for a naming conflict. right now it's a bit unclear There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. done in yet-unpushed b4268cf |
||
if (this == null || other == null) { | ||
return false | ||
} | ||
return this.namespace == other.namespace && this.name == other.name | ||
} | ||
|
||
/** | ||
* map from the column name as declared in the schema, to the column name that we'll create in the | ||
* final (typed) table. | ||
*/ | ||
@JvmInline | ||
value class ColumnNameMapping(private val columnNameMapping: Map<String, String>) : | ||
Map<String, String> by columnNameMapping | ||
|
||
sealed interface TableNameGenerator { | ||
fun getTableName(streamDescriptor: DestinationStream.Descriptor): TableName | ||
} | ||
|
||
fun interface RawTableNameGenerator : TableNameGenerator | ||
|
||
fun interface FinalTableNameGenerator : TableNameGenerator | ||
|
||
fun interface ColumnNameGenerator { | ||
/** | ||
* In some database/warehouses, there's a difference between how a name is _displayed_, and how | ||
* the underlying engine actually treats it. For example, a column might be displayed as | ||
* `CamelCaseColumn`, but the engine actually treats it as lowercase `camelcasecolumn`, or | ||
* truncate it to `CamelCas`. Bigquery is an example of this: `create table foo (foo int, FOO | ||
* int)` is invalid, because `foo` is duplicated. | ||
* | ||
* This is relevant for handling collisions between column names. We need to know what name will | ||
* be displayed to the user, since that's what we'll use in queries - but we also need to know | ||
* the "canonical" name to check whether two columns will collide. | ||
*/ | ||
data class ColumnName(val displayName: String, val canonicalName: String) | ||
fun getColumnName(column: String): ColumnName | ||
} | ||
|
||
const val CDC_DELETED_AT_COLUMN = "_ab_cdc_deleted_at" |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
exact copypaste from old CDK.