@@ -7,12 +7,14 @@ import com.fasterxml.jackson.databind.JsonNode
7
7
import io.airbyte.cdk.db.jdbc.JdbcDatabase
8
8
import io.airbyte.cdk.integrations.base.JavaBaseConstants
9
9
import io.airbyte.cdk.integrations.base.JavaBaseConstants.COLUMN_NAME_AB_EXTRACTED_AT
10
+ import io.airbyte.cdk.integrations.base.JavaBaseConstants.COLUMN_NAME_AB_GENERATION_ID
10
11
import io.airbyte.cdk.integrations.base.JavaBaseConstants.COLUMN_NAME_AB_ID
11
12
import io.airbyte.cdk.integrations.base.JavaBaseConstants.COLUMN_NAME_AB_LOADED_AT
12
13
import io.airbyte.cdk.integrations.base.JavaBaseConstants.COLUMN_NAME_AB_META
13
14
import io.airbyte.cdk.integrations.base.JavaBaseConstants.COLUMN_NAME_AB_RAW_ID
14
15
import io.airbyte.cdk.integrations.base.JavaBaseConstants.COLUMN_NAME_DATA
15
16
import io.airbyte.cdk.integrations.base.JavaBaseConstants.COLUMN_NAME_EMITTED_AT
17
+ import io.airbyte.cdk.integrations.base.JavaBaseConstants.DestinationColumns
16
18
import io.airbyte.cdk.integrations.base.JavaBaseConstants.LEGACY_RAW_TABLE_COLUMNS
17
19
import io.airbyte.cdk.integrations.destination.jdbc.typing_deduping.JdbcSqlGenerator
18
20
import io.airbyte.integrations.base.destination.typing_deduping.AirbyteProtocolType
@@ -90,16 +92,18 @@ abstract class JdbcSqlGeneratorIntegrationTest<DestinationState : MinimumDestina
90
92
91
93
@Throws(Exception ::class )
92
94
override fun createRawTable (streamId : StreamId ) {
93
- database.execute(
95
+ val columns =
94
96
dslContext
95
97
.createTable(DSL .name(streamId.rawNamespace, streamId.rawName))
96
98
.column(COLUMN_NAME_AB_RAW_ID , SQLDataType .VARCHAR (36 ).nullable(false ))
97
99
.column(COLUMN_NAME_AB_EXTRACTED_AT , timestampWithTimeZoneType.nullable(false ))
98
100
.column(COLUMN_NAME_AB_LOADED_AT , timestampWithTimeZoneType)
99
101
.column(COLUMN_NAME_DATA , structType.nullable(false ))
100
102
.column(COLUMN_NAME_AB_META , structType.nullable(true ))
101
- .getSQL(ParamType .INLINED )
102
- )
103
+ if (sqlGenerator.columns == DestinationColumns .V2_WITH_GENERATION ) {
104
+ columns.column(JavaBaseConstants .COLUMN_NAME_AB_GENERATION_ID , SQLDataType .BIGINT )
105
+ }
106
+ database.execute(columns.getSQL(ParamType .INLINED ))
103
107
}
104
108
105
109
@Throws(Exception ::class )
@@ -118,7 +122,7 @@ abstract class JdbcSqlGeneratorIntegrationTest<DestinationState : MinimumDestina
118
122
public override fun insertRawTableRecords (streamId : StreamId , records : List <JsonNode >) {
119
123
insertRecords(
120
124
DSL .name(streamId.rawNamespace, streamId.rawName),
121
- JavaBaseConstants . V2_RAW_TABLE_COLUMN_NAMES ,
125
+ sqlGenerator.columns.rawColumns ,
122
126
records,
123
127
COLUMN_NAME_DATA ,
124
128
COLUMN_NAME_AB_META
@@ -143,9 +147,12 @@ abstract class JdbcSqlGeneratorIntegrationTest<DestinationState : MinimumDestina
143
147
records : List <JsonNode >,
144
148
generationId : Long ,
145
149
) {
146
- // TODO handle generation ID
147
150
val columnNames =
148
- if (includeCdcDeletedAt) FINAL_TABLE_COLUMN_NAMES_CDC else FINAL_TABLE_COLUMN_NAMES
151
+ (if (includeCdcDeletedAt) FINAL_TABLE_COLUMN_NAMES_CDC else FINAL_TABLE_COLUMN_NAMES )
152
+ .toMutableList()
153
+ if (sqlGenerator.columns == DestinationColumns .V2_WITH_GENERATION ) {
154
+ columnNames + = COLUMN_NAME_AB_GENERATION_ID
155
+ }
149
156
insertRecords(
150
157
DSL .name(streamId.finalNamespace, streamId.finalName + suffix),
151
158
columnNames,
0 commit comments