Skip to content

Commit ec485f1

Browse files
committed
Bigquery / Bulk Load CDK: implement T+D, truncate refresh, etc (#57578)
1 parent adcac71 commit ec485f1

File tree

60 files changed

+3175
-4034
lines changed

Some content is hidden

Large Commits have some content hidden by default. Use the searchbox below for content that may be hidden.

60 files changed

+3175
-4034
lines changed
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,62 @@
1+
/*
2+
* Copyright (c) 2025 Airbyte, Inc., all rights reserved.
3+
*/
4+
5+
package io.airbyte.cdk.util
6+
7+
import java.util.Optional
8+
9+
object CollectionUtils {
10+
/**
11+
* Pass in a collection and search term to determine whether any of the values match ignoring
12+
* case
13+
*
14+
* @param collection the collection of values
15+
* @param search the value to look for
16+
* @return whether the value matches anything in the collection
17+
*/
18+
@JvmStatic
19+
fun containsIgnoreCase(collection: Collection<String>, search: String): Boolean {
20+
return matchingKey(collection, search).isPresent
21+
}
22+
23+
/**
24+
* Convenience method for when you need to check an entire collection for membership in another
25+
* collection.
26+
*
27+
* @param searchCollection the collection you want to check membership in
28+
* @param searchTerms the keys you're looking for
29+
* @return whether all searchTerms are in the searchCollection
30+
*/
31+
@JvmStatic
32+
fun containsAllIgnoreCase(
33+
searchCollection: Collection<String>,
34+
searchTerms: Collection<String>
35+
): Boolean {
36+
require(!searchTerms.isEmpty()) {
37+
// There isn't a good behavior for an empty collection. Without this check, an empty
38+
// collection
39+
// would always return
40+
// true, but it feels misleading to say that the searchCollection does "contain all"
41+
// when
42+
// searchTerms is empty
43+
"Search Terms collection may not be empty"
44+
}
45+
return searchTerms.all { term: String -> containsIgnoreCase(searchCollection, term) }
46+
}
47+
48+
/**
49+
* From a collection of strings, return an entry which matches the search term ignoring case
50+
*
51+
* @param collection the collection to search
52+
* @param search the key you're looking for
53+
* @return an Optional value which might contain the key that matches the search
54+
*/
55+
@JvmStatic
56+
fun matchingKey(collection: Collection<String>, search: String): Optional<String> {
57+
if (collection.contains(search)) {
58+
return Optional.of(search)
59+
}
60+
return Optional.ofNullable(collection.firstOrNull { it.equals(search, ignoreCase = true) })
61+
}
62+
}
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,9 @@
1+
/*
2+
* Copyright (c) 2025 Airbyte, Inc., all rights reserved.
3+
*/
4+
5+
package io.airbyte.cdk.util
6+
7+
object ConnectorExceptionUtil {
8+
val HTTP_AUTHENTICATION_ERROR_CODES: List<Int> = listOf(401, 403)
9+
}

airbyte-cdk/bulk/core/load/src/main/kotlin/io/airbyte/cdk/load/command/DestinationStream.kt

+4
Original file line numberDiff line numberDiff line change
@@ -7,6 +7,7 @@ package io.airbyte.cdk.load.command
77
import io.airbyte.cdk.load.data.AirbyteType
88
import io.airbyte.cdk.load.data.json.AirbyteTypeToJsonSchema
99
import io.airbyte.cdk.load.data.json.JsonSchemaToAirbyteType
10+
import io.airbyte.cdk.load.message.DestinationRecord
1011
import io.airbyte.protocol.models.v0.AirbyteStream
1112
import io.airbyte.protocol.models.v0.ConfiguredAirbyteStream
1213
import io.airbyte.protocol.models.v0.DestinationSyncMode
@@ -136,6 +137,9 @@ data class Dedupe(
136137
/**
137138
* theoretically, the path to the cursor. In practice, most destinations only support cursors at
138139
* the root level, i.e. `listOf(cursorField)`.
140+
*
141+
* If this is set to an empty list, then the destination should use
142+
* [DestinationRecord.message.record.emittedAt] as the cursor.
139143
*/
140144
val cursor: List<String>,
141145
) : ImportType

airbyte-cdk/bulk/core/load/src/main/kotlin/io/airbyte/cdk/load/data/AirbyteType.kt

+103-17
Original file line numberDiff line numberDiff line change
@@ -5,41 +5,127 @@
55
package io.airbyte.cdk.load.data
66

77
import com.fasterxml.jackson.databind.JsonNode
8+
import io.airbyte.cdk.load.util.Jsons
9+
import io.github.oshai.kotlinlogging.KotlinLogging
10+
11+
private val logger = KotlinLogging.logger {}
12+
13+
sealed class AirbyteType {
14+
/**
15+
* Utility method for database/warehouse destinations, which assume that the top-level schema is
16+
* an object.
17+
*/
18+
open fun asColumns(): LinkedHashMap<String, FieldType> {
19+
return linkedMapOf()
20+
}
821

9-
sealed interface AirbyteType
22+
open val isObject: Boolean = false
23+
open val isArray: Boolean = false
24+
}
1025

11-
data object StringType : AirbyteType
26+
data object StringType : AirbyteType()
1227

13-
data object BooleanType : AirbyteType
28+
data object BooleanType : AirbyteType()
1429

15-
data object IntegerType : AirbyteType
30+
data object IntegerType : AirbyteType()
1631

17-
data object NumberType : AirbyteType
32+
data object NumberType : AirbyteType()
1833

19-
data object DateType : AirbyteType
34+
data object DateType : AirbyteType()
2035

21-
data object TimestampTypeWithTimezone : AirbyteType
36+
data object TimestampTypeWithTimezone : AirbyteType()
2237

23-
data object TimestampTypeWithoutTimezone : AirbyteType
38+
data object TimestampTypeWithoutTimezone : AirbyteType()
2439

25-
data object TimeTypeWithTimezone : AirbyteType
40+
data object TimeTypeWithTimezone : AirbyteType()
2641

27-
data object TimeTypeWithoutTimezone : AirbyteType
42+
data object TimeTypeWithoutTimezone : AirbyteType()
2843

29-
data class ArrayType(val items: FieldType) : AirbyteType
44+
data class ArrayType(val items: FieldType) : AirbyteType() {
45+
override val isArray = true
46+
}
47+
48+
data object ArrayTypeWithoutSchema : AirbyteType() {
49+
override val isArray = true
50+
}
3051

31-
data object ArrayTypeWithoutSchema : AirbyteType
52+
data class ObjectType(val properties: LinkedHashMap<String, FieldType>) : AirbyteType() {
53+
override fun asColumns(): LinkedHashMap<String, FieldType> {
54+
return properties
55+
}
3256

33-
data class ObjectType(val properties: LinkedHashMap<String, FieldType>) : AirbyteType
57+
override val isObject = true
58+
}
3459

35-
data object ObjectTypeWithEmptySchema : AirbyteType
60+
data object ObjectTypeWithEmptySchema : AirbyteType() {
61+
override val isObject = true
62+
}
3663

37-
data object ObjectTypeWithoutSchema : AirbyteType
64+
data object ObjectTypeWithoutSchema : AirbyteType() {
65+
override val isObject = true
66+
}
3867

3968
data class UnionType(
4069
val options: Set<AirbyteType>,
4170
val isLegacyUnion: Boolean,
42-
) : AirbyteType {
71+
) : AirbyteType() {
72+
/**
73+
* This is a hack to handle weird schemas like {type: [object, string]}. If a stream's top-level
74+
* schema looks like this, we still want to be able to extract the object properties (i.e. treat
75+
* it as though the string option didn't exist).
76+
*
77+
* @throws IllegalArgumentException if we cannot extract columns from this schema
78+
*/
79+
override fun asColumns(): LinkedHashMap<String, FieldType> {
80+
logger.warn { "asColumns options=$options" }
81+
val numObjectOptions = options.count { it.isObject }
82+
if (numObjectOptions > 1) {
83+
logger.error { "Can't extract columns from a schema with multiple object options" }
84+
return LinkedHashMap()
85+
}
86+
87+
var retVal: LinkedHashMap<String, FieldType>
88+
try {
89+
retVal = options.first { it.isObject }.asColumns()
90+
} catch (_: NoSuchElementException) {
91+
logger.error { "Can't extract columns from a schema with no object options" }
92+
retVal = LinkedHashMap()
93+
}
94+
logger.warn { "Union.asColumns retVal=$retVal" }
95+
return retVal
96+
}
97+
98+
/**
99+
* This matches legacy behavior. Some destinations handle legacy unions by choosing the "best"
100+
* type from amongst the options. This is... not great, but it would be painful to change.
101+
*/
102+
fun chooseType(): AirbyteType {
103+
check(isLegacyUnion) { "Cannot chooseType for a non-legacy union type" }
104+
if (options.isEmpty()) {
105+
return UnknownType(Jsons.createObjectNode())
106+
}
107+
return options.minBy {
108+
when (it) {
109+
is ArrayType,
110+
ArrayTypeWithoutSchema -> -2
111+
is ObjectType,
112+
ObjectTypeWithEmptySchema,
113+
ObjectTypeWithoutSchema -> -1
114+
StringType -> 0
115+
DateType -> 1
116+
TimeTypeWithoutTimezone -> 2
117+
TimeTypeWithTimezone -> 3
118+
TimestampTypeWithoutTimezone -> 4
119+
TimestampTypeWithTimezone -> 5
120+
NumberType -> 6
121+
IntegerType -> 7
122+
BooleanType -> 8
123+
is UnknownType -> 9
124+
is UnionType -> Int.MAX_VALUE
125+
}
126+
}
127+
}
128+
43129
companion object {
44130
fun of(options: Set<AirbyteType>, isLegacyUnion: Boolean = false): AirbyteType {
45131
if (options.size == 1) {
@@ -56,6 +142,6 @@ data class UnionType(
56142
}
57143
}
58144

59-
data class UnknownType(val schema: JsonNode) : AirbyteType
145+
data class UnknownType(val schema: JsonNode) : AirbyteType()
60146

61147
data class FieldType(val type: AirbyteType, val nullable: Boolean)

airbyte-cdk/bulk/core/load/src/main/kotlin/io/airbyte/cdk/load/message/DestinationMessage.kt

+3
Original file line numberDiff line numberDiff line change
@@ -128,6 +128,9 @@ data class Meta(
128128
COLUMN_NAME_AB_GENERATION_ID,
129129
)
130130

131+
/** A legacy column name. Used in "DV2" destinations' raw tables. */
132+
const val COLUMN_NAME_AB_LOADED_AT = "_airbyte_loaded_at"
133+
131134
fun getMetaValue(metaColumnName: String, value: String): AirbyteValue {
132135
if (!COLUMN_NAMES.contains(metaColumnName)) {
133136
throw IllegalArgumentException("Invalid meta column name: $metaColumnName")
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,17 @@
1+
/*
2+
* Copyright (c) 2025 Airbyte, Inc., all rights reserved.
3+
*/
4+
5+
package io.airbyte.cdk.load.orchestration.db
6+
7+
interface DatabaseHandler {
8+
fun execute(sql: Sql)
9+
10+
/**
11+
* Create the namespaces (typically something like `create schema`).
12+
*
13+
* This function should assume that all `namespaces` are valid identifiers, i.e. any special
14+
* characters have already been escaped, they respect identifier name length, etc.
15+
*/
16+
suspend fun createNamespaces(namespaces: Collection<String>)
17+
}
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,28 @@
1+
/*
2+
* Copyright (c) 2025 Airbyte, Inc., all rights reserved.
3+
*/
4+
5+
package io.airbyte.cdk.load.orchestration.db
6+
7+
import io.airbyte.cdk.load.command.DestinationStream
8+
import io.airbyte.cdk.load.orchestration.db.legacy_typing_deduping.TableCatalog
9+
10+
interface DatabaseInitialStatus
11+
12+
/**
13+
* Some destinations can efficiently fetch multiple tables' information in a single query, so this
14+
* interface accepts multiple streams in a single method call.
15+
*
16+
* For destinations which do not support that optimization, a simpler implementation would be
17+
* something like this:
18+
* ```kotlin
19+
* streams.forEach { (stream, (tableNames, columnNames)) ->
20+
* launch {
21+
* // ... gather state...
22+
* }
23+
* }
24+
* ```
25+
*/
26+
fun interface DatabaseInitialStatusGatherer<InitialStatus : DatabaseInitialStatus> {
27+
suspend fun gatherInitialStatus(streams: TableCatalog): Map<DestinationStream, InitialStatus>
28+
}
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,82 @@
1+
/*
2+
* Copyright (c) 2025 Airbyte, Inc., all rights reserved.
3+
*/
4+
5+
package io.airbyte.cdk.load.orchestration.db
6+
7+
import io.airbyte.cdk.load.command.DestinationStream
8+
9+
data class TableNames(
10+
// this is pretty dumb, but in theory we could have:
11+
// * old-style implementation: raw+final tables both exist
12+
// * only the raw table exists (i.e. T+D disabled)
13+
// * only the final table exists (i.e. new-style direct-load tables)
14+
val rawTableName: TableName?,
15+
val finalTableName: TableName?,
16+
) {
17+
init {
18+
check(rawTableName != null || finalTableName != null) {
19+
"At least one table name should be nonnull"
20+
}
21+
}
22+
23+
fun hasNamingConflictWith(other: TableNames) =
24+
this.rawTableName.hasNamingConflictWith(other.rawTableName) ||
25+
this.finalTableName.hasNamingConflictWith(other.finalTableName)
26+
27+
fun toPrettyString() =
28+
"Raw table: ${rawTableName?.toPrettyString()}; Final table: ${finalTableName?.toPrettyString()}"
29+
30+
companion object {
31+
const val NO_SUFFIX = ""
32+
// TODO comment explaining this
33+
const val TMP_TABLE_SUFFIX = "_airbyte_tmp"
34+
const val SOFT_RESET_SUFFIX = "_ab_soft_reset"
35+
}
36+
}
37+
38+
data class TableName(val namespace: String, val name: String) {
39+
fun toPrettyString(quote: String = "", suffix: String = "") =
40+
"$quote$namespace$quote.$quote$name$suffix$quote"
41+
}
42+
43+
fun TableName?.hasNamingConflictWith(other: TableName?): Boolean {
44+
if (this == null || other == null) {
45+
return false
46+
}
47+
return this.namespace == other.namespace && this.name == other.name
48+
}
49+
50+
/**
51+
* map from the column name as declared in the schema, to the column name that we'll create in the
52+
* final (typed) table.
53+
*/
54+
@JvmInline
55+
value class ColumnNameMapping(private val columnNameMapping: Map<String, String>) :
56+
Map<String, String> by columnNameMapping
57+
58+
sealed interface TableNameGenerator {
59+
fun getTableName(streamDescriptor: DestinationStream.Descriptor): TableName
60+
}
61+
62+
fun interface RawTableNameGenerator : TableNameGenerator
63+
64+
fun interface FinalTableNameGenerator : TableNameGenerator
65+
66+
fun interface ColumnNameGenerator {
67+
/**
68+
* In some database/warehouses, there's a difference between how a name is _displayed_, and how
69+
* the underlying engine actually treats it. For example, a column might be displayed as
70+
* `CamelCaseColumn`, but the engine actually treats it as lowercase `camelcasecolumn`, or
71+
* truncate it to `CamelCas`. Bigquery is an example of this: `create table foo (foo int, FOO
72+
* int)` is invalid, because `foo` is duplicated.
73+
*
74+
* This is relevant for handling collisions between column names. We need to know what name will
75+
* be displayed to the user, since that's what we'll use in queries - but we also need to know
76+
* the "canonical" name to check whether two columns will collide.
77+
*/
78+
data class ColumnName(val displayName: String, val canonicalName: String)
79+
fun getColumnName(column: String): ColumnName
80+
}
81+
82+
const val CDC_DELETED_AT_COLUMN = "_ab_cdc_deleted_at"

0 commit comments

Comments
 (0)