Skip to content

Commit ef4d48d

Browse files
edgaoxiaohansong
authored andcommitted
CDK: Destinations: Backport CDK fixes for redshift (#40499)
1 parent 7a6167e commit ef4d48d

File tree

5 files changed

+87
-45
lines changed

5 files changed

+87
-45
lines changed

airbyte-cdk/java/airbyte-cdk/README.md

Lines changed: 4 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -174,7 +174,10 @@ corresponds to that version.
174174

175175
| Version | Date | Pull Request | Subject |
176176
|:--------| :--------- | :--------------------------------------------------------- |:---------------------------------------------------------------------------------------------------------------------------------------------------------------|
177-
| 0.40.7 | 2024-07-01 | [40516](https://github.com/airbytehq/airbyte/pull/40516) | Remove dbz hearbeat. |
177+
| 0.40.8 | 2024-06-25 | [\#40499](https://github.com/airbytehq/airbyte/pull/40499) | Make JdbcDatabase SQL statement logging optional; add generation_id support to JdbcSqlGenerator |
178+
| 0.38.3 | 2024-06-25 | [\#40499](https://github.com/airbytehq/airbyte/pull/40499) | (backport) Make JdbcDatabase SQL statement logging optional; add generation_id support to JdbcSqlGenerator |
179+
| 0.40.7 | 2024-07-01 | [40516](https://github.com/airbytehq/airbyte/pull/40516) | Remove dbz hearbeat. |
180+
| ~~0.40.6~~ | | | (this version does not exist) |
178181
| 0.40.5 | 2024-06-26 | [\#40517](https://github.com/airbytehq/airbyte/pull/40517) | JdbcDatabase.executeWithinTransaction allows disabling SQL statement logging |
179182
| 0.35.16 | 2024-06-25 | [\#40517](https://github.com/airbytehq/airbyte/pull/40517) | (backport) JdbcDatabase.executeWithinTransaction allows disabling SQL statement logging |
180183
| 0.40.4 | 2024-06-18 | [\#40254](https://github.com/airbytehq/airbyte/pull/40254) | Destinations: Do not throw on unrecognized airbyte message type (ignore message instead) |
Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -1 +1 @@
1-
version=0.40.7
1+
version=0.40.8

airbyte-cdk/java/airbyte-cdk/db-destinations/src/main/kotlin/io/airbyte/cdk/integrations/destination/jdbc/typing_deduping/JdbcDestinationHandler.kt

Lines changed: 16 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -7,6 +7,7 @@ import com.fasterxml.jackson.databind.JsonNode
77
import com.fasterxml.jackson.databind.node.ObjectNode
88
import io.airbyte.cdk.db.jdbc.JdbcDatabase
99
import io.airbyte.cdk.integrations.base.JavaBaseConstants
10+
import io.airbyte.cdk.integrations.base.JavaBaseConstants.DestinationColumns
1011
import io.airbyte.cdk.integrations.destination.jdbc.ColumnDefinition
1112
import io.airbyte.cdk.integrations.destination.jdbc.TableDefinition
1213
import io.airbyte.cdk.integrations.util.ConnectorExceptionUtil.getResultsOrLogAndThrowFirst
@@ -50,7 +51,8 @@ abstract class JdbcDestinationHandler<DestinationState>(
5051
protected val catalogName: String?,
5152
protected val jdbcDatabase: JdbcDatabase,
5253
protected val rawTableNamespace: String,
53-
private val dialect: SQLDialect
54+
private val dialect: SQLDialect,
55+
private val columns: DestinationColumns = DestinationColumns.V2_WITH_GENERATION,
5456
) : DestinationHandler<DestinationState> {
5557
protected val dslContext: DSLContext
5658
get() = DSL.using(dialect)
@@ -363,6 +365,14 @@ abstract class JdbcDestinationHandler<DestinationState>(
363365
)
364366
}
365367

368+
protected open fun isAirbyteGenerationColumnMatch(existingTable: TableDefinition): Boolean {
369+
return toJdbcTypeName(AirbyteProtocolType.INTEGER)
370+
.equals(
371+
existingTable.columns.getValue(JavaBaseConstants.COLUMN_NAME_AB_GENERATION_ID).type,
372+
ignoreCase = true,
373+
)
374+
}
375+
366376
open protected fun existingSchemaMatchesStreamConfig(
367377
stream: StreamConfig?,
368378
existingTable: TableDefinition
@@ -375,7 +385,11 @@ abstract class JdbcDestinationHandler<DestinationState>(
375385
JavaBaseConstants.COLUMN_NAME_AB_EXTRACTED_AT
376386
) && isAirbyteExtractedAtColumnMatch(existingTable)) ||
377387
!(existingTable.columns.containsKey(JavaBaseConstants.COLUMN_NAME_AB_META) &&
378-
isAirbyteMetaColumnMatch(existingTable))
388+
isAirbyteMetaColumnMatch(existingTable)) ||
389+
(columns == DestinationColumns.V2_WITH_GENERATION &&
390+
!(existingTable.columns.containsKey(
391+
JavaBaseConstants.COLUMN_NAME_AB_GENERATION_ID
392+
) && isAirbyteGenerationColumnMatch(existingTable)))
379393
) {
380394
// Missing AB meta columns from final table, we need them to do proper T+D so trigger
381395
// soft-reset

airbyte-cdk/java/airbyte-cdk/db-destinations/src/main/kotlin/io/airbyte/cdk/integrations/destination/jdbc/typing_deduping/JdbcSqlGenerator.kt

Lines changed: 53 additions & 35 deletions
Original file line numberDiff line numberDiff line change
@@ -5,6 +5,7 @@ package io.airbyte.cdk.integrations.destination.jdbc.typing_deduping
55

66
import com.google.common.annotations.VisibleForTesting
77
import io.airbyte.cdk.integrations.base.JavaBaseConstants
8+
import io.airbyte.cdk.integrations.base.JavaBaseConstants.DestinationColumns
89
import io.airbyte.cdk.integrations.destination.NamingConventionTransformer
910
import io.airbyte.integrations.base.destination.typing_deduping.AirbyteProtocolType
1011
import io.airbyte.integrations.base.destination.typing_deduping.AirbyteType
@@ -23,12 +24,11 @@ import io.airbyte.integrations.base.destination.typing_deduping.UnsupportedOneOf
2324
import io.airbyte.protocol.models.v0.DestinationSyncMode
2425
import java.sql.Timestamp
2526
import java.time.Instant
26-
import java.util.*
27-
import kotlin.Any
28-
import kotlin.Boolean
29-
import kotlin.IllegalArgumentException
27+
import java.util.Locale
28+
import java.util.Optional
3029
import kotlin.Int
3130
import org.jooq.Condition
31+
import org.jooq.CreateTableColumnStep
3232
import org.jooq.DSLContext
3333
import org.jooq.DataType
3434
import org.jooq.Field
@@ -37,6 +37,7 @@ import org.jooq.Name
3737
import org.jooq.Record
3838
import org.jooq.SQLDialect
3939
import org.jooq.SelectConditionStep
40+
import org.jooq.SelectFieldOrAsterisk
4041
import org.jooq.conf.ParamType
4142
import org.jooq.impl.DSL
4243
import org.jooq.impl.SQLDataType
@@ -45,7 +46,9 @@ abstract class JdbcSqlGenerator
4546
@JvmOverloads
4647
constructor(
4748
protected val namingTransformer: NamingConventionTransformer,
48-
private val cascadeDrop: Boolean = false
49+
private val cascadeDrop: Boolean = false,
50+
@VisibleForTesting
51+
internal val columns: DestinationColumns = DestinationColumns.V2_WITH_GENERATION,
4952
) : SqlGenerator {
5053
protected val cdcDeletedAtColumn: ColumnId = buildColumnId("_ab_cdc_deleted_at")
5154

@@ -199,6 +202,9 @@ constructor(
199202
SQLDataType.VARCHAR(36).nullable(false)
200203
metaColumns[JavaBaseConstants.COLUMN_NAME_AB_EXTRACTED_AT] =
201204
timestampWithTimeZoneType.nullable(false)
205+
if (columns == DestinationColumns.V2_WITH_GENERATION) {
206+
metaColumns[JavaBaseConstants.COLUMN_NAME_AB_GENERATION_ID] = SQLDataType.BIGINT
207+
}
202208
if (includeMetaColumn)
203209
metaColumns[JavaBaseConstants.COLUMN_NAME_AB_META] = structType.nullable(false)
204210
return metaColumns
@@ -332,38 +338,50 @@ constructor(
332338
rawTableName: Name,
333339
namespace: String,
334340
tableName: String
335-
) =
336-
dslContext
337-
.createTable(rawTableName)
338-
.column(
339-
JavaBaseConstants.COLUMN_NAME_AB_RAW_ID,
340-
SQLDataType.VARCHAR(36).nullable(false),
341-
)
342-
.column(
343-
JavaBaseConstants.COLUMN_NAME_AB_EXTRACTED_AT,
344-
timestampWithTimeZoneType.nullable(false),
345-
)
346-
.column(
347-
JavaBaseConstants.COLUMN_NAME_AB_LOADED_AT,
348-
timestampWithTimeZoneType.nullable(true),
349-
)
350-
.column(JavaBaseConstants.COLUMN_NAME_DATA, structType.nullable(false))
351-
.column(JavaBaseConstants.COLUMN_NAME_AB_META, structType.nullable(true))
352-
.`as`(
353-
DSL.select(
354-
DSL.field(JavaBaseConstants.COLUMN_NAME_AB_ID)
355-
.`as`(JavaBaseConstants.COLUMN_NAME_AB_RAW_ID),
356-
DSL.field(JavaBaseConstants.COLUMN_NAME_EMITTED_AT)
357-
.`as`(JavaBaseConstants.COLUMN_NAME_AB_EXTRACTED_AT),
358-
DSL.cast(null, timestampWithTimeZoneType)
359-
.`as`(JavaBaseConstants.COLUMN_NAME_AB_LOADED_AT),
360-
DSL.field(JavaBaseConstants.COLUMN_NAME_DATA)
361-
.`as`(JavaBaseConstants.COLUMN_NAME_DATA),
362-
DSL.cast(null, structType).`as`(JavaBaseConstants.COLUMN_NAME_AB_META),
363-
)
364-
.from(DSL.table(DSL.name(namespace, tableName))),
341+
): String {
342+
val hasGenerationId = columns == DestinationColumns.V2_WITH_GENERATION
343+
344+
val createTable: CreateTableColumnStep =
345+
dslContext
346+
.createTable(rawTableName)
347+
.column(
348+
JavaBaseConstants.COLUMN_NAME_AB_RAW_ID,
349+
SQLDataType.VARCHAR(36).nullable(false),
350+
)
351+
.column(
352+
JavaBaseConstants.COLUMN_NAME_AB_EXTRACTED_AT,
353+
timestampWithTimeZoneType.nullable(false),
354+
)
355+
.column(
356+
JavaBaseConstants.COLUMN_NAME_AB_LOADED_AT,
357+
timestampWithTimeZoneType.nullable(true),
358+
)
359+
.column(JavaBaseConstants.COLUMN_NAME_DATA, structType.nullable(false))
360+
.column(JavaBaseConstants.COLUMN_NAME_AB_META, structType.nullable(true))
361+
if (hasGenerationId) {
362+
createTable.column(JavaBaseConstants.COLUMN_NAME_AB_GENERATION_ID, SQLDataType.BIGINT)
363+
}
364+
365+
val selectColumns: MutableList<SelectFieldOrAsterisk> =
366+
mutableListOf(
367+
DSL.field(JavaBaseConstants.COLUMN_NAME_AB_ID)
368+
.`as`(JavaBaseConstants.COLUMN_NAME_AB_RAW_ID),
369+
DSL.field(JavaBaseConstants.COLUMN_NAME_EMITTED_AT)
370+
.`as`(JavaBaseConstants.COLUMN_NAME_AB_EXTRACTED_AT),
371+
DSL.cast(null, timestampWithTimeZoneType)
372+
.`as`(JavaBaseConstants.COLUMN_NAME_AB_LOADED_AT),
373+
DSL.field(JavaBaseConstants.COLUMN_NAME_DATA)
374+
.`as`(JavaBaseConstants.COLUMN_NAME_DATA),
375+
DSL.cast(null, structType).`as`(JavaBaseConstants.COLUMN_NAME_AB_META),
365376
)
377+
if (hasGenerationId) {
378+
selectColumns += DSL.value(0).`as`(JavaBaseConstants.COLUMN_NAME_AB_GENERATION_ID)
379+
}
380+
381+
return createTable
382+
.`as`(DSL.select(selectColumns).from(DSL.table(DSL.name(namespace, tableName))))
366383
.getSQL(ParamType.INLINED)
384+
}
367385

368386
override fun clearLoadedAt(streamId: StreamId): Sql {
369387
return of(

airbyte-cdk/java/airbyte-cdk/db-destinations/src/testFixtures/kotlin/io/airbyte/cdk/integrations/standardtest/destination/typing_deduping/JdbcSqlGeneratorIntegrationTest.kt

Lines changed: 13 additions & 6 deletions
Original file line numberDiff line numberDiff line change
@@ -7,12 +7,14 @@ import com.fasterxml.jackson.databind.JsonNode
77
import io.airbyte.cdk.db.jdbc.JdbcDatabase
88
import io.airbyte.cdk.integrations.base.JavaBaseConstants
99
import io.airbyte.cdk.integrations.base.JavaBaseConstants.COLUMN_NAME_AB_EXTRACTED_AT
10+
import io.airbyte.cdk.integrations.base.JavaBaseConstants.COLUMN_NAME_AB_GENERATION_ID
1011
import io.airbyte.cdk.integrations.base.JavaBaseConstants.COLUMN_NAME_AB_ID
1112
import io.airbyte.cdk.integrations.base.JavaBaseConstants.COLUMN_NAME_AB_LOADED_AT
1213
import io.airbyte.cdk.integrations.base.JavaBaseConstants.COLUMN_NAME_AB_META
1314
import io.airbyte.cdk.integrations.base.JavaBaseConstants.COLUMN_NAME_AB_RAW_ID
1415
import io.airbyte.cdk.integrations.base.JavaBaseConstants.COLUMN_NAME_DATA
1516
import io.airbyte.cdk.integrations.base.JavaBaseConstants.COLUMN_NAME_EMITTED_AT
17+
import io.airbyte.cdk.integrations.base.JavaBaseConstants.DestinationColumns
1618
import io.airbyte.cdk.integrations.base.JavaBaseConstants.LEGACY_RAW_TABLE_COLUMNS
1719
import io.airbyte.cdk.integrations.destination.jdbc.typing_deduping.JdbcSqlGenerator
1820
import io.airbyte.integrations.base.destination.typing_deduping.AirbyteProtocolType
@@ -90,16 +92,18 @@ abstract class JdbcSqlGeneratorIntegrationTest<DestinationState : MinimumDestina
9092

9193
@Throws(Exception::class)
9294
override fun createRawTable(streamId: StreamId) {
93-
database.execute(
95+
val columns =
9496
dslContext
9597
.createTable(DSL.name(streamId.rawNamespace, streamId.rawName))
9698
.column(COLUMN_NAME_AB_RAW_ID, SQLDataType.VARCHAR(36).nullable(false))
9799
.column(COLUMN_NAME_AB_EXTRACTED_AT, timestampWithTimeZoneType.nullable(false))
98100
.column(COLUMN_NAME_AB_LOADED_AT, timestampWithTimeZoneType)
99101
.column(COLUMN_NAME_DATA, structType.nullable(false))
100102
.column(COLUMN_NAME_AB_META, structType.nullable(true))
101-
.getSQL(ParamType.INLINED)
102-
)
103+
if (sqlGenerator.columns == DestinationColumns.V2_WITH_GENERATION) {
104+
columns.column(JavaBaseConstants.COLUMN_NAME_AB_GENERATION_ID, SQLDataType.BIGINT)
105+
}
106+
database.execute(columns.getSQL(ParamType.INLINED))
103107
}
104108

105109
@Throws(Exception::class)
@@ -118,7 +122,7 @@ abstract class JdbcSqlGeneratorIntegrationTest<DestinationState : MinimumDestina
118122
public override fun insertRawTableRecords(streamId: StreamId, records: List<JsonNode>) {
119123
insertRecords(
120124
DSL.name(streamId.rawNamespace, streamId.rawName),
121-
JavaBaseConstants.V2_RAW_TABLE_COLUMN_NAMES,
125+
sqlGenerator.columns.rawColumns,
122126
records,
123127
COLUMN_NAME_DATA,
124128
COLUMN_NAME_AB_META
@@ -143,9 +147,12 @@ abstract class JdbcSqlGeneratorIntegrationTest<DestinationState : MinimumDestina
143147
records: List<JsonNode>,
144148
generationId: Long,
145149
) {
146-
// TODO handle generation ID
147150
val columnNames =
148-
if (includeCdcDeletedAt) FINAL_TABLE_COLUMN_NAMES_CDC else FINAL_TABLE_COLUMN_NAMES
151+
(if (includeCdcDeletedAt) FINAL_TABLE_COLUMN_NAMES_CDC else FINAL_TABLE_COLUMN_NAMES)
152+
.toMutableList()
153+
if (sqlGenerator.columns == DestinationColumns.V2_WITH_GENERATION) {
154+
columnNames += COLUMN_NAME_AB_GENERATION_ID
155+
}
149156
insertRecords(
150157
DSL.name(streamId.finalNamespace, streamId.finalName + suffix),
151158
columnNames,

0 commit comments

Comments
 (0)