@@ -5,6 +5,7 @@ package io.airbyte.cdk.integrations.destination.jdbc.typing_deduping
5
5
6
6
import com.google.common.annotations.VisibleForTesting
7
7
import io.airbyte.cdk.integrations.base.JavaBaseConstants
8
+ import io.airbyte.cdk.integrations.base.JavaBaseConstants.DestinationColumns
8
9
import io.airbyte.cdk.integrations.destination.NamingConventionTransformer
9
10
import io.airbyte.integrations.base.destination.typing_deduping.AirbyteProtocolType
10
11
import io.airbyte.integrations.base.destination.typing_deduping.AirbyteType
@@ -23,12 +24,11 @@ import io.airbyte.integrations.base.destination.typing_deduping.UnsupportedOneOf
23
24
import io.airbyte.protocol.models.v0.DestinationSyncMode
24
25
import java.sql.Timestamp
25
26
import java.time.Instant
26
- import java.util.*
27
- import kotlin.Any
28
- import kotlin.Boolean
29
- import kotlin.IllegalArgumentException
27
+ import java.util.Locale
28
+ import java.util.Optional
30
29
import kotlin.Int
31
30
import org.jooq.Condition
31
+ import org.jooq.CreateTableColumnStep
32
32
import org.jooq.DSLContext
33
33
import org.jooq.DataType
34
34
import org.jooq.Field
@@ -37,6 +37,7 @@ import org.jooq.Name
37
37
import org.jooq.Record
38
38
import org.jooq.SQLDialect
39
39
import org.jooq.SelectConditionStep
40
+ import org.jooq.SelectFieldOrAsterisk
40
41
import org.jooq.conf.ParamType
41
42
import org.jooq.impl.DSL
42
43
import org.jooq.impl.SQLDataType
@@ -45,7 +46,8 @@ abstract class JdbcSqlGenerator
45
46
@JvmOverloads
46
47
constructor (
47
48
protected val namingTransformer: NamingConventionTransformer ,
48
- private val cascadeDrop: Boolean = false
49
+ private val cascadeDrop: Boolean = false ,
50
+ private val columns: DestinationColumns = DestinationColumns .V2_WITH_GENERATION ,
49
51
) : SqlGenerator {
50
52
protected val cdcDeletedAtColumn: ColumnId = buildColumnId(" _ab_cdc_deleted_at" )
51
53
@@ -199,6 +201,9 @@ constructor(
199
201
SQLDataType .VARCHAR (36 ).nullable(false )
200
202
metaColumns[JavaBaseConstants .COLUMN_NAME_AB_EXTRACTED_AT ] =
201
203
timestampWithTimeZoneType.nullable(false )
204
+ if (columns == DestinationColumns .V2_WITH_GENERATION ) {
205
+ metaColumns[JavaBaseConstants .COLUMN_NAME_AB_GENERATION_ID ] = SQLDataType .BIGINT
206
+ }
202
207
if (includeMetaColumn)
203
208
metaColumns[JavaBaseConstants .COLUMN_NAME_AB_META ] = structType.nullable(false )
204
209
return metaColumns
@@ -332,38 +337,51 @@ constructor(
332
337
rawTableName : Name ,
333
338
namespace : String ,
334
339
tableName : String
335
- ) =
336
- dslContext
337
- .createTable(rawTableName)
338
- .column(
339
- JavaBaseConstants .COLUMN_NAME_AB_RAW_ID ,
340
- SQLDataType .VARCHAR (36 ).nullable(false ),
341
- )
342
- .column(
343
- JavaBaseConstants .COLUMN_NAME_AB_EXTRACTED_AT ,
344
- timestampWithTimeZoneType.nullable(false ),
345
- )
346
- .column(
347
- JavaBaseConstants .COLUMN_NAME_AB_LOADED_AT ,
348
- timestampWithTimeZoneType.nullable(true ),
349
- )
350
- .column(JavaBaseConstants .COLUMN_NAME_DATA , structType.nullable(false ))
351
- .column(JavaBaseConstants .COLUMN_NAME_AB_META , structType.nullable(true ))
352
- .`as `(
353
- DSL .select(
354
- DSL .field(JavaBaseConstants .COLUMN_NAME_AB_ID )
355
- .`as `(JavaBaseConstants .COLUMN_NAME_AB_RAW_ID ),
356
- DSL .field(JavaBaseConstants .COLUMN_NAME_EMITTED_AT )
357
- .`as `(JavaBaseConstants .COLUMN_NAME_AB_EXTRACTED_AT ),
358
- DSL .cast(null , timestampWithTimeZoneType)
359
- .`as `(JavaBaseConstants .COLUMN_NAME_AB_LOADED_AT ),
360
- DSL .field(JavaBaseConstants .COLUMN_NAME_DATA )
361
- .`as `(JavaBaseConstants .COLUMN_NAME_DATA ),
362
- DSL .cast(null , structType).`as `(JavaBaseConstants .COLUMN_NAME_AB_META ),
363
- )
364
- .from(DSL .table(DSL .name(namespace, tableName))),
340
+ ): String {
341
+ val hasGenerationId = columns == DestinationColumns .V2_WITH_GENERATION
342
+
343
+ val createTable: CreateTableColumnStep =
344
+ dslContext
345
+ .createTable(rawTableName)
346
+ .column(
347
+ JavaBaseConstants .COLUMN_NAME_AB_RAW_ID ,
348
+ SQLDataType .VARCHAR (36 ).nullable(false ),
349
+ )
350
+ .column(
351
+ JavaBaseConstants .COLUMN_NAME_AB_EXTRACTED_AT ,
352
+ timestampWithTimeZoneType.nullable(false ),
353
+ )
354
+ .column(
355
+ JavaBaseConstants .COLUMN_NAME_AB_LOADED_AT ,
356
+ timestampWithTimeZoneType.nullable(true ),
357
+ )
358
+ .column(JavaBaseConstants .COLUMN_NAME_DATA , structType.nullable(false ))
359
+ .column(JavaBaseConstants .COLUMN_NAME_AB_META , structType.nullable(true ))
360
+ if (hasGenerationId) {
361
+ createTable.column(JavaBaseConstants .COLUMN_NAME_AB_GENERATION_ID , SQLDataType .BIGINT )
362
+ }
363
+
364
+ val selectColumns: MutableList <SelectFieldOrAsterisk > =
365
+ mutableListOf (
366
+ DSL .field(JavaBaseConstants .COLUMN_NAME_AB_ID )
367
+ .`as `(JavaBaseConstants .COLUMN_NAME_AB_RAW_ID ),
368
+ DSL .field(JavaBaseConstants .COLUMN_NAME_EMITTED_AT )
369
+ .`as `(JavaBaseConstants .COLUMN_NAME_AB_EXTRACTED_AT ),
370
+ DSL .cast(null , timestampWithTimeZoneType)
371
+ .`as `(JavaBaseConstants .COLUMN_NAME_AB_LOADED_AT ),
372
+ DSL .field(JavaBaseConstants .COLUMN_NAME_DATA )
373
+ .`as `(JavaBaseConstants .COLUMN_NAME_DATA ),
374
+ DSL .cast(null , structType).`as `(JavaBaseConstants .COLUMN_NAME_AB_META ),
375
+ DSL .value(0 ).`as `(JavaBaseConstants .COLUMN_NAME_AB_GENERATION_ID ),
365
376
)
377
+ if (hasGenerationId) {
378
+ selectColumns + = DSL .value(0 ).`as `(JavaBaseConstants .COLUMN_NAME_AB_GENERATION_ID )
379
+ }
380
+
381
+ return createTable
382
+ .`as `(DSL .select(selectColumns).from(DSL .table(DSL .name(namespace, tableName))))
366
383
.getSQL(ParamType .INLINED )
384
+ }
367
385
368
386
override fun clearLoadedAt (streamId : StreamId ): Sql {
369
387
return of(
0 commit comments