Skip to content

Bigquery / Bulk Load CDK: implement T+D, truncate refresh, etc #57578

New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Merged
merged 55 commits into from
Apr 24, 2025
Merged
Show file tree
Hide file tree
Changes from 35 commits
Commits
Show all changes
55 commits
Select commit Hold shift + click to select a range
2ce873a
stashing WIP
edgao Apr 10, 2025
6e01e97
stashing WIP
edgao Apr 14, 2025
16c3cfd
stashing WIP
edgao Apr 14, 2025
0509cf7
stashing WIP
edgao Apr 14, 2025
80e1447
progress
edgao Apr 14, 2025
626fa83
rename
edgao Apr 14, 2025
7d718d0
rough direct-load sketch
edgao Apr 15, 2025
e399c3c
implement T+D framework
edgao Apr 15, 2025
c43d10b
fix stuff
edgao Apr 15, 2025
b529a83
progress
edgao Apr 15, 2025
0457865
derp
edgao Apr 15, 2025
063c79d
start porting to new iface
edgao Apr 15, 2025
c378068
progress on sqlgenerator
edgao Apr 15, 2025
7ffb0b8
progress
edgao Apr 15, 2025
4694dea
mostly done?
edgao Apr 15, 2025
31da2f9
ffs
edgao Apr 16, 2025
1f6f2c8
wow
edgao Apr 16, 2025
68b82ae
wire everything together
edgao Apr 17, 2025
c89574e
resources
edgao Apr 17, 2025
09d2497
less micronaut
edgao Apr 17, 2025
a852bef
kill TempUtils
edgao Apr 17, 2025
9e4b482
delete some code
edgao Apr 17, 2025
b47374d
make legacy tests mostly work
edgao Apr 17, 2025
f90b2b3
kill streamid here
edgao Apr 18, 2025
9e1a737
kill files
edgao Apr 18, 2025
b1c89f0
one usage
edgao Apr 18, 2025
f5df055
whoops
edgao Apr 18, 2025
b54cebd
copy AlterTableReport + CollectionUtils
edgao Apr 21, 2025
acbedf0
kill name transformer referencens
edgao Apr 21, 2025
a3fa044
column name stuff
edgao Apr 21, 2025
e148eca
last stuff?
edgao Apr 21, 2025
298d9bf
kill old CDK deps from runtime code
edgao Apr 21, 2025
9cf1fb3
clean up file
edgao Apr 21, 2025
1692493
derp
edgao Apr 21, 2025
d8401ff
uniqify
edgao Apr 21, 2025
dddb3f5
move to subpackage
edgao Apr 23, 2025
24d32fa
formatting
edgao Apr 23, 2025
bafeb80
rename destination -> databasehandler
edgao Apr 23, 2025
590a904
give this map a name
edgao Apr 23, 2025
ef252ef
airbytetype is abstract class now
edgao Apr 23, 2025
b4268cf
rename method
edgao Apr 23, 2025
97dce7a
rename method
edgao Apr 23, 2025
02cc7cc
rename class
edgao Apr 23, 2025
e2a1ea7
derp
edgao Apr 23, 2025
edfa43c
fix stuff
edgao Apr 23, 2025
f9a7794
pass state through?
edgao Apr 23, 2025
626a2f7
whoops
edgao Apr 23, 2025
7477f85
simplify raw table name gen
edgao Apr 24, 2025
3498570
update thread count
edgao Apr 24, 2025
22078b9
trimIndent
edgao Apr 24, 2025
522bd19
simplify impl
edgao Apr 24, 2025
c6c1f3a
use kotlin stuff
edgao Apr 24, 2025
b1bacbf
better string munge
edgao Apr 24, 2025
1db68c5
use QueryJobConfiguration.of
edgao Apr 24, 2025
2f8e95e
merge filters
edgao Apr 24, 2025
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
@@ -0,0 +1,62 @@
/*
* Copyright (c) 2025 Airbyte, Inc., all rights reserved.
*/

package io.airbyte.cdk.util

import java.util.Optional

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

exact copypaste from old CDK.

object CollectionUtils {
/**
* Pass in a collection and search term to determine whether any of the values match ignoring
* case
*
* @param collection the collection of values
* @param search the value to look for
* @return whether the value matches anything in the collection
*/
@JvmStatic
fun containsIgnoreCase(collection: Collection<String>, search: String): Boolean {
return matchingKey(collection, search).isPresent
}

/**
* Convenience method for when you need to check an entire collection for membership in another
* collection.
*
* @param searchCollection the collection you want to check membership in
* @param searchTerms the keys you're looking for
* @return whether all searchTerms are in the searchCollection
*/
@JvmStatic
fun containsAllIgnoreCase(
searchCollection: Collection<String>,
searchTerms: Collection<String>
): Boolean {
require(!searchTerms.isEmpty()) {
// There isn't a good behavior for an empty collection. Without this check, an empty
// collection
// would always return
// true, but it feels misleading to say that the searchCollection does "contain all"
// when
// searchTerms is empty
"Search Terms collection may not be empty"
}
return searchTerms.all { term: String -> containsIgnoreCase(searchCollection, term) }
}

/**
* From a collection of strings, return an entry which matches the search term ignoring case
*
* @param collection the collection to search
* @param search the key you're looking for
* @return an Optional value which might contain the key that matches the search
*/
@JvmStatic
fun matchingKey(collection: Collection<String>, search: String): Optional<String> {
if (collection.contains(search)) {
return Optional.of(search)
}
return Optional.ofNullable(collection.firstOrNull { it.equals(search, ignoreCase = true) })
}
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,9 @@
/*
* Copyright (c) 2025 Airbyte, Inc., all rights reserved.
*/

package io.airbyte.cdk.util

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

old CDK's ConnectorExceptionUtil had a ton of other stuff in it. I just copied in the thing we're using.

object ConnectorExceptionUtil {
val HTTP_AUTHENTICATION_ERROR_CODES: List<Int> = listOf(401, 403)
}
Original file line number Diff line number Diff line change
Expand Up @@ -7,6 +7,7 @@ package io.airbyte.cdk.load.command
import io.airbyte.cdk.load.data.AirbyteType
import io.airbyte.cdk.load.data.json.AirbyteTypeToJsonSchema
import io.airbyte.cdk.load.data.json.JsonSchemaToAirbyteType
import io.airbyte.cdk.load.message.DestinationRecord
import io.airbyte.protocol.models.v0.AirbyteStream
import io.airbyte.protocol.models.v0.ConfiguredAirbyteStream
import io.airbyte.protocol.models.v0.DestinationSyncMode
Expand Down Expand Up @@ -128,6 +129,9 @@ data class Dedupe(
/**
* theoretically, the path to the cursor. In practice, most destinations only support cursors at
* the root level, i.e. `listOf(cursorField)`.
*
* If this is set to an empty list, then the destination should use
* [DestinationRecord.message.record.emittedAt] as the cursor.
*/
val cursor: List<String>,
) : ImportType
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -5,8 +5,25 @@
package io.airbyte.cdk.load.data

import com.fasterxml.jackson.databind.JsonNode
import io.airbyte.cdk.load.util.Jsons
import io.github.oshai.kotlinlogging.KotlinLogging

private val logger = KotlinLogging.logger {}

sealed interface AirbyteType {
Copy link
Contributor

@frifriSF59 frifriSF59 Apr 21, 2025

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Should we use a sealed class instead of sealed interface since AirbyteType doesn't need multiple inheritance?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

done in yet-unpushed ef252ef

/**
* Utility method for database/warehouse destinations, which assume that the top-level schema is
* an object.
*/
fun asColumns(): LinkedHashMap<String, FieldType> {
return linkedMapOf()
}

sealed interface AirbyteType
val isObject: Boolean
get() = false
val isArray: Boolean
get() = false
}

data object StringType : AirbyteType

Expand All @@ -26,20 +43,91 @@ data object TimeTypeWithTimezone : AirbyteType

data object TimeTypeWithoutTimezone : AirbyteType

data class ArrayType(val items: FieldType) : AirbyteType
data class ArrayType(val items: FieldType) : AirbyteType {
override val isArray = true
}

data object ArrayTypeWithoutSchema : AirbyteType {
override val isArray = true
}

data object ArrayTypeWithoutSchema : AirbyteType
data class ObjectType(val properties: LinkedHashMap<String, FieldType>) : AirbyteType {
override fun asColumns(): LinkedHashMap<String, FieldType> {
return properties
}

data class ObjectType(val properties: LinkedHashMap<String, FieldType>) : AirbyteType
override val isObject = true
}

data object ObjectTypeWithEmptySchema : AirbyteType
data object ObjectTypeWithEmptySchema : AirbyteType {
override val isObject = true
}

data object ObjectTypeWithoutSchema : AirbyteType
data object ObjectTypeWithoutSchema : AirbyteType {
override val isObject = true
}

data class UnionType(
val options: Set<AirbyteType>,
val isLegacyUnion: Boolean,
) : AirbyteType {
/**
* This is a hack to handle weird schemas like {type: [object, string]}. If a stream's top-level
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Ugh. One day we will storm the sources castle and slay this beast

* schema looks like this, we still want to be able to extract the object properties (i.e. treat
* it as though the string option didn't exist).
*
* @throws IllegalArgumentException if we cannot extract columns from this schema
*/
override fun asColumns(): LinkedHashMap<String, FieldType> {
logger.warn { "asColumns options=$options" }
val numObjectOptions = options.count { it.isObject }
if (numObjectOptions > 1) {
logger.error { "Can't extract columns from a schema with multiple object options" }
return LinkedHashMap()
}

var retVal: LinkedHashMap<String, FieldType>
try {
retVal = options.first { it.isObject }.asColumns()
} catch (_: NoSuchElementException) {
logger.error { "Can't extract columns from a schema with no object options" }
retVal = LinkedHashMap()
}
logger.warn { "Union.asColumns retVal=$retVal" }
return retVal
}

/**
* This matches legacy behavior. Some destinations handle legacy unions by choosing the "best"
* type from amongst the options. This is... not great, but it would be painful to change.
*/
fun chooseType(): AirbyteType {
check(isLegacyUnion) { "Cannot chooseType for a non-legacy union type" }
if (options.isEmpty()) {
return UnknownType(Jsons.createObjectNode())
}
return options.minBy {
when (it) {
is ArrayType,
ArrayTypeWithoutSchema -> -2
is ObjectType,
ObjectTypeWithEmptySchema,
ObjectTypeWithoutSchema -> -1
StringType -> 0
DateType -> 1
TimeTypeWithoutTimezone -> 2
TimeTypeWithTimezone -> 3
TimestampTypeWithoutTimezone -> 4
TimestampTypeWithTimezone -> 5
NumberType -> 6
IntegerType -> 7
BooleanType -> 8
is UnknownType -> 9
is UnionType -> Int.MAX_VALUE
}
}
}

companion object {
fun of(options: Set<AirbyteType>, isLegacyUnion: Boolean = false): AirbyteType {
if (options.size == 1) {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -126,6 +126,9 @@ data class Meta(
COLUMN_NAME_AB_GENERATION_ID,
)

/** A legacy column name. Used in "DV2" destinations' raw tables. */
const val COLUMN_NAME_AB_LOADED_AT = "_airbyte_loaded_at"

fun getMetaValue(metaColumnName: String, value: String): AirbyteValue {
if (!COLUMN_NAMES.contains(metaColumnName)) {
throw IllegalArgumentException("Invalid meta column name: $metaColumnName")
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,17 @@
/*
* Copyright (c) 2025 Airbyte, Inc., all rights reserved.
*/

package io.airbyte.cdk.load.orchestration

interface DestinationHandler {
Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

this is similar to the existing DestinationHandler interface, but trimmed down. The "initial status" stuff has ben moved to DestinationInitialStatusGatherer.

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

What a name

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Oh, this refers to the Destination not the Destination. I was confused

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This is really a DatabaseDestinationHandler?

also FYI the other db stuff has db in the namespace (cdk.load.db.orchestration)

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Or StructuredDataStoreHandler if you want to get really pedantic, since it will also cover warehouses.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I'll go with "database", I feel like context is sufficiently clear that it also refers to warehouses :P

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

renamed in (as yet unpushed) bafeb80

fun execute(sql: Sql)

/**
* Create the namespaces (typically something like `create schema`).
*
* This function should assume that all `namespaces` are valid identifiers, i.e. any special
* characters have already been escaped, they respect identifier name length, etc.
*/
suspend fun createNamespaces(namespaces: Collection<String>)
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

In practice there's never more than one namespace though?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

there are :/

  • T+D: we have the raw table namespace (typically airbyte_internal) + any source-defined namespace(s)
  • and in general: DB sources sometimes have multiple namespaces (e.g. pulling from multiple Postgres schemas)

}
Original file line number Diff line number Diff line change
@@ -0,0 +1,29 @@
/*
* Copyright (c) 2025 Airbyte, Inc., all rights reserved.
*/

package io.airbyte.cdk.load.orchestration

import io.airbyte.cdk.load.command.DestinationStream

interface DestinationInitialStatus
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

again Database- or StructuredDataStoreInitialStatus

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

done in yet-unpushed 02cc7cc


/**
* Some destinations can efficiently fetch multiple tables' information in a single query, so this
* interface accepts multiple streams in a single method call.
*
* For destinations which do not support that optimization, a simpler implementation would be
* something like this:
* ```kotlin
* streams.forEach { (stream, (tableNames, columnNames)) ->
* launch {
* // ... gather state...
* }
* }
* ```
*/
fun interface DestinationInitialStatusGatherer<InitialStatus : DestinationInitialStatus> {
suspend fun gatherInitialStatus(
streams: Map<DestinationStream, Pair<TableNames, ColumnNameMapping>>,
): Map<DestinationStream, InitialStatus>
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,82 @@
/*
* Copyright (c) 2025 Airbyte, Inc., all rights reserved.
*/

package io.airbyte.cdk.load.orchestration

import io.airbyte.cdk.load.command.DestinationStream

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

this file supersedes the StreamConfig / StreamId / ColumnId stuff that the old CDK used. Mostly b/c the bulk CDK's DestinationStream is a clear replacement for StreamConfig.

It's a bit less ergonomic, in that instead of passing a single StreamConfig around, we have to pass around tuples of DestinationStream x TableName(s) x ColumnNameMapping. But I kind of like the explicitness, that every method now only accepts exactly what it needs.

data class TableNames(
// this is pretty dumb, but in theory we could have:
// * old-style implementation: raw+final tables both exist
// * only the raw table exists (i.e. T+D disabled)
// * only the final table exists (i.e. new-style direct-load tables)
val rawTableName: TableName?,
val finalTableName: TableName?,
) {
init {
check(rawTableName != null || finalTableName != null) {
"At least one table name should be nonnull"
}
}

fun conflictsWith(other: TableNames) =
this.rawTableName.conflictsWith(other.rawTableName) ||
this.finalTableName.conflictsWith(other.finalTableName)

fun prettyPrint() =
"Raw table: ${rawTableName?.prettyPrint()}; Final table: ${finalTableName?.prettyPrint()}"

companion object {
const val NO_SUFFIX = ""
// TODO comment explaining this
const val TMP_TABLE_SUFFIX = "_airbyte_tmp"
const val SOFT_RESET_SUFFIX = "_ab_soft_reset"
}
}

data class TableName(val namespace: String, val name: String) {
fun prettyPrint(quote: String = "", suffix: String = "") =
"$quote$namespace$quote.$quote$name$suffix$quote"
}

fun TableName?.conflictsWith(other: TableName?): Boolean {
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I think this should mention the fact that this more specifically for a naming conflict. right now it's a bit unclear

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

done in yet-unpushed b4268cf

if (this == null || other == null) {
return false
}
return this.namespace == other.namespace && this.name == other.name
}

/**
* map from the column name as declared in the schema, to the column name that we'll create in the
* final (typed) table.
*/
@JvmInline
value class ColumnNameMapping(private val columnNameMapping: Map<String, String>) :
Map<String, String> by columnNameMapping

sealed interface TableNameGenerator {
fun getTableName(streamDescriptor: DestinationStream.Descriptor): TableName
}

fun interface RawTableNameGenerator : TableNameGenerator

fun interface FinalTableNameGenerator : TableNameGenerator

fun interface ColumnNameGenerator {
/**
* In some database/warehouses, there's a difference between how a name is _displayed_, and how
* the underlying engine actually treats it. For example, a column might be displayed as
* `CamelCaseColumn`, but the engine actually treats it as lowercase `camelcasecolumn`, or
* truncate it to `CamelCas`. Bigquery is an example of this: `create table foo (foo int, FOO
* int)` is invalid, because `foo` is duplicated.
*
* This is relevant for handling collisions between column names. We need to know what name will
* be displayed to the user, since that's what we'll use in queries - but we also need to know
* the "canonical" name to check whether two columns will collide.
*/
data class ColumnName(val displayName: String, val canonicalName: String)
fun getColumnName(column: String): ColumnName
}

const val CDC_DELETED_AT_COLUMN = "_ab_cdc_deleted_at"
Loading
Loading