Skip to content

Commit 8f94ff2

Browse files
committed
snowflake-genid-meta
1 parent 459c37c commit 8f94ff2

File tree

5 files changed

+53
-21
lines changed

5 files changed

+53
-21
lines changed

airbyte-integrations/connectors/destination-snowflake/build.gradle

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -5,7 +5,7 @@ plugins {
55
airbyteJavaConnector {
66
cdkVersionRequired = '0.35.15'
77
features = ['db-destinations', 's3-destinations', 'typing-deduping']
8-
useLocalCdk = false
8+
useLocalCdk = true
99
}
1010

1111
java {

airbyte-integrations/connectors/destination-snowflake/src/main/kotlin/io/airbyte/integrations/destination/snowflake/operation/SnowflakeStorageOperation.kt

Lines changed: 8 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -60,11 +60,16 @@ class SnowflakeStorageOperation(
6060
return "TRUNCATE TABLE \"${streamId.rawNamespace}\".\"${streamId.rawName}\";\n"
6161
}
6262

63-
override fun writeToStage(streamId: StreamId, data: SerializableBuffer) {
64-
val stageName = getStageName(streamId)
63+
override fun writeToStage(streamConfig: StreamConfig, data: SerializableBuffer) {
64+
val stageName = getStageName(streamConfig.id)
6565
val stagingPath = getStagingPath()
6666
val stagedFileName = staging.uploadRecordsToStage(data, stageName, stagingPath)
67-
staging.copyIntoTableFromStage(stageName, stagingPath, listOf(stagedFileName), streamId)
67+
staging.copyIntoTableFromStage(
68+
stageName,
69+
stagingPath,
70+
listOf(stagedFileName),
71+
streamConfig.id
72+
)
6873
}
6974
override fun cleanupStage(streamId: StreamId) {
7075
val stageName = getStageName(streamId)

airbyte-integrations/connectors/destination-snowflake/src/main/kotlin/io/airbyte/integrations/destination/snowflake/typing_deduping/SnowflakeSqlGenerator.kt

Lines changed: 13 additions & 7 deletions
Original file line numberDiff line numberDiff line change
@@ -116,7 +116,8 @@ class SnowflakeSqlGenerator(private val retentionPeriodDays: Int) : SqlGenerator
116116
CREATE ${'$'}{force_create_table} TABLE ${'$'}{final_table_id} (
117117
"_AIRBYTE_RAW_ID" TEXT NOT NULL,
118118
"_AIRBYTE_EXTRACTED_AT" TIMESTAMP_TZ NOT NULL,
119-
"_AIRBYTE_META" VARIANT NOT NULL
119+
"_AIRBYTE_META" VARIANT NOT NULL,
120+
"_AIRBYTE_GENERATION_ID" INTEGER
120121
${'$'}{column_declarations}
121122
) data_retention_time_in_days = ${'$'}{retention_period_days};
122123
@@ -282,7 +283,8 @@ class SnowflakeSqlGenerator(private val retentionPeriodDays: Int) : SqlGenerator
282283
${'$'}{column_list}
283284
"_AIRBYTE_META",
284285
"_AIRBYTE_RAW_ID",
285-
"_AIRBYTE_EXTRACTED_AT"
286+
"_AIRBYTE_EXTRACTED_AT",
287+
"_AIRBYTE_GENERATION_ID"
286288
)
287289
${'$'}{extractNewRawRecords};
288290
""".trimIndent()
@@ -392,7 +394,8 @@ class SnowflakeSqlGenerator(private val retentionPeriodDays: Int) : SqlGenerator
392394
${'$'}{column_casts}
393395
ARRAY_CONSTRUCT_COMPACT(${'$'}{column_errors}) as "_airbyte_cast_errors",
394396
"_airbyte_raw_id",
395-
${'$'}{airbyte_extracted_at_utc} as "_airbyte_extracted_at"
397+
${'$'}{airbyte_extracted_at_utc} as "_airbyte_extracted_at",
398+
"_airbyte_generation_id"
396399
FROM ${'$'}{raw_table_id}
397400
WHERE (
398401
"_airbyte_loaded_at" IS NULL
@@ -403,15 +406,16 @@ class SnowflakeSqlGenerator(private val retentionPeriodDays: Int) : SqlGenerator
403406
${'$'}{column_list}
404407
OBJECT_CONSTRUCT('errors', "_airbyte_cast_errors") AS "_AIRBYTE_META",
405408
"_airbyte_raw_id" AS "_AIRBYTE_RAW_ID",
406-
"_airbyte_extracted_at" AS "_AIRBYTE_EXTRACTED_AT"
409+
"_airbyte_extracted_at" AS "_AIRBYTE_EXTRACTED_AT",
410+
"_airbyte_generation_id" AS "_AIRBYTE_GENERATION_ID"
407411
FROM intermediate_data
408412
), numbered_rows AS (
409413
SELECT *, row_number() OVER (
410414
PARTITION BY ${'$'}{pk_list} ORDER BY ${'$'}{cursor_order_clause} "_AIRBYTE_EXTRACTED_AT" DESC
411415
) AS row_number
412416
FROM new_records
413417
)
414-
SELECT ${'$'}{column_list} "_AIRBYTE_META", "_AIRBYTE_RAW_ID", "_AIRBYTE_EXTRACTED_AT"
418+
SELECT ${'$'}{column_list} "_AIRBYTE_META", "_AIRBYTE_RAW_ID", "_AIRBYTE_EXTRACTED_AT", "_AIRBYTE_GENERATION_ID"
415419
FROM numbered_rows
416420
WHERE row_number = 1
417421
""".trimIndent()
@@ -440,7 +444,8 @@ class SnowflakeSqlGenerator(private val retentionPeriodDays: Int) : SqlGenerator
440444
${'$'}{column_casts}
441445
ARRAY_CONSTRUCT_COMPACT(${'$'}{column_errors}) as "_airbyte_cast_errors",
442446
"_airbyte_raw_id",
443-
${'$'}{airbyte_extracted_at_utc} as "_airbyte_extracted_at"
447+
${'$'}{airbyte_extracted_at_utc} as "_airbyte_extracted_at",
448+
"_airbyte_generation_id"
444449
FROM ${'$'}{raw_table_id}
445450
WHERE
446451
"_airbyte_loaded_at" IS NULL
@@ -450,7 +455,8 @@ class SnowflakeSqlGenerator(private val retentionPeriodDays: Int) : SqlGenerator
450455
${'$'}{column_list}
451456
OBJECT_CONSTRUCT('errors', "_airbyte_cast_errors") AS "_AIRBYTE_META",
452457
"_airbyte_raw_id" AS "_AIRBYTE_RAW_ID",
453-
"_airbyte_extracted_at" AS "_AIRBYTE_EXTRACTED_AT"
458+
"_airbyte_extracted_at" AS "_AIRBYTE_EXTRACTED_AT",
459+
"_airbyte_generation_id" AS "_AIRBYTE_GENERATION_ID"
454460
FROM intermediate_data
455461
""".trimIndent()
456462
)

airbyte-integrations/connectors/destination-snowflake/src/test-integration/kotlin/io/airbyte/integrations/destination/snowflake/typing_deduping/SnowflakeSqlGeneratorIntegrationTest.kt

Lines changed: 17 additions & 9 deletions
Original file line numberDiff line numberDiff line change
@@ -83,7 +83,9 @@ class SnowflakeSqlGeneratorIntegrationTest : BaseSqlGeneratorIntegrationTest<Sno
8383
"_airbyte_raw_id" TEXT NOT NULL,
8484
"_airbyte_data" VARIANT NOT NULL,
8585
"_airbyte_extracted_at" TIMESTAMP_TZ NOT NULL,
86-
"_airbyte_loaded_at" TIMESTAMP_TZ
86+
"_airbyte_loaded_at" TIMESTAMP_TZ,
87+
"_airbyte_meta" VARIANT,
88+
"_airbyte_generation_id" INTEGER
8789
)
8890
8991
""".trimIndent()
@@ -123,12 +125,13 @@ class SnowflakeSqlGeneratorIntegrationTest : BaseSqlGeneratorIntegrationTest<Sno
123125
includeCdcDeletedAt: Boolean,
124126
streamId: StreamId,
125127
suffix: String?,
126-
records: List<JsonNode>
128+
records: List<JsonNode>,
129+
generationId: Long
127130
) {
128131
val columnNames =
129132
if (includeCdcDeletedAt) FINAL_TABLE_COLUMN_NAMES_CDC else FINAL_TABLE_COLUMN_NAMES
130133
val cdcDeletedAtName = if (includeCdcDeletedAt) ",\"_AB_CDC_DELETED_AT\"" else ""
131-
val cdcDeletedAtExtract = if (includeCdcDeletedAt) ",column19" else ""
134+
val cdcDeletedAtExtract = if (includeCdcDeletedAt) ",column20" else ""
132135
val recordsText =
133136
records
134137
.stream() // For each record, convert it to a string like "(rawId, extractedAt,
@@ -169,6 +172,7 @@ class SnowflakeSqlGeneratorIntegrationTest : BaseSqlGeneratorIntegrationTest<Sno
169172
"_AIRBYTE_RAW_ID",
170173
"_AIRBYTE_EXTRACTED_AT",
171174
"_AIRBYTE_META",
175+
"_AIRBYTE_GENERATION_ID",
172176
"ID1",
173177
"ID2",
174178
"UPDATED_AT",
@@ -193,9 +197,9 @@ class SnowflakeSqlGeneratorIntegrationTest : BaseSqlGeneratorIntegrationTest<Sno
193197
column4,
194198
column5,
195199
column6,
196-
PARSE_JSON(column7),
200+
column7,
197201
PARSE_JSON(column8),
198-
column9,
202+
PARSE_JSON(column9),
199203
column10,
200204
column11,
201205
column12,
@@ -204,7 +208,8 @@ class SnowflakeSqlGeneratorIntegrationTest : BaseSqlGeneratorIntegrationTest<Sno
204208
column15,
205209
column16,
206210
column17,
207-
PARSE_JSON(column18)
211+
column18,
212+
PARSE_JSON(column19)
208213
#{cdc_deleted_at_extract}
209214
FROM VALUES
210215
#{records}
@@ -230,7 +235,7 @@ class SnowflakeSqlGeneratorIntegrationTest : BaseSqlGeneratorIntegrationTest<Sno
230235
.stream() // For each record, convert it to a string like "(rawId, extractedAt,
231236
// loadedAt, data)"
232237
.map { record: JsonNode ->
233-
JavaBaseConstants.V2_RAW_TABLE_COLUMN_NAMES.stream()
238+
JavaBaseConstants.V2_RAW_TABLE_COLUMN_NAMES_WITH_GENERATION.stream()
234239
.map { fieldName: String? -> record[fieldName] }
235240
.map { node: JsonNode? -> this.dollarQuoteWrap(node) }
236241
.collect(Collectors.joining(","))
@@ -341,6 +346,7 @@ class SnowflakeSqlGeneratorIntegrationTest : BaseSqlGeneratorIntegrationTest<Sno
341346
.put("_AIRBYTE_RAW_ID", "TEXT")
342347
.put("_AIRBYTE_EXTRACTED_AT", "TIMESTAMP_TZ")
343348
.put("_AIRBYTE_META", "VARIANT")
349+
.put("_AIRBYTE_GENERATION_ID", "NUMBER(38, 0)")
344350
.put("ID1", "NUMBER(38, 0)")
345351
.put("ID2", "NUMBER(38, 0)")
346352
.put("UPDATED_AT", "TIMESTAMP_TZ")
@@ -708,7 +714,8 @@ class SnowflakeSqlGeneratorIntegrationTest : BaseSqlGeneratorIntegrationTest<Sno
708714
709715
""".trimIndent()
710716
)
711-
)
717+
),
718+
0
712719
)
713720
// Gather initial state at the start of our updated sync
714721
val initialState =
@@ -1234,7 +1241,8 @@ class SnowflakeSqlGeneratorIntegrationTest : BaseSqlGeneratorIntegrationTest<Sno
12341241
12351242
""".trimIndent()
12361243
)
1237-
)
1244+
),
1245+
0
12381246
)
12391247
// Gather initial state at the start of our updated sync
12401248
val initialState =

airbyte-integrations/connectors/destination-snowflake/src/test/kotlin/io/airbyte/integrations/destination/snowflake/operation/SnowflakeStorageOperationTest.kt

Lines changed: 14 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -7,10 +7,12 @@ package io.airbyte.integrations.destination.snowflake.operation
77
import edu.umd.cs.findbugs.annotations.SuppressFBWarnings
88
import io.airbyte.cdk.integrations.destination.s3.csv.CsvSerializedBuffer
99
import io.airbyte.integrations.base.destination.typing_deduping.Sql
10+
import io.airbyte.integrations.base.destination.typing_deduping.StreamConfig
1011
import io.airbyte.integrations.base.destination.typing_deduping.StreamId
1112
import io.airbyte.integrations.destination.snowflake.typing_deduping.SnowflakeDestinationHandler
1213
import io.airbyte.integrations.destination.snowflake.typing_deduping.SnowflakeSqlGenerator
1314
import io.airbyte.protocol.models.v0.DestinationSyncMode
15+
import java.util.*
1416
import org.junit.jupiter.api.AfterEach
1517
import org.junit.jupiter.api.Test
1618
import org.mockito.Mockito.mock
@@ -78,7 +80,7 @@ class SnowflakeStorageOperationTest {
7880
val storageOperation =
7981
SnowflakeStorageOperation(sqlGenerator, destinationHandler, 1, stagingClient)
8082

81-
storageOperation.writeToStage(streamId, data)
83+
storageOperation.writeToStage(streamConfig, data)
8284
val inOrder = inOrder(stagingClient)
8385
inOrder.verify(stagingClient).uploadRecordsToStage(any(), eq(stageName), any())
8486
inOrder
@@ -105,5 +107,16 @@ class SnowflakeStorageOperationTest {
105107
"original_namespace",
106108
"original_name",
107109
)
110+
val streamConfig =
111+
StreamConfig(
112+
streamId,
113+
DestinationSyncMode.OVERWRITE,
114+
listOf(),
115+
Optional.empty(),
116+
linkedMapOf(),
117+
0,
118+
0,
119+
0,
120+
)
108121
}
109122
}

0 commit comments

Comments
 (0)