Skip to content

Commit 0e80ea3

Browse files
committed
snowflake cdk bump
1 parent 5fb7304 commit 0e80ea3

File tree

6 files changed

+70
-23
lines changed

6 files changed

+70
-23
lines changed

airbyte-cdk/java/airbyte-cdk/typing-deduping/src/testFixtures/kotlin/io/airbyte/integrations/base/destination/typing_deduping/BaseTypingDedupingTest.kt

+5-4
Original file line numberDiff line numberDiff line change
@@ -1117,9 +1117,7 @@ abstract class BaseTypingDedupingTest {
11171117
val expectedRawRecords0 = readRecords("dat/sync2_expectedrecords_raw.jsonl")
11181118
val expectedFinalRecords0 =
11191119
readRecords("dat/sync2_expectedrecords_fullrefresh_append_final.jsonl")
1120-
(expectedFinalRecords0 + expectedRawRecords0).forEach {
1121-
(it as ObjectNode).put(JavaBaseConstants.COLUMN_NAME_AB_GENERATION_ID, 41)
1122-
}
1120+
fixGenerationId(expectedRawRecords0, expectedFinalRecords0, 41)
11231121
verifySyncResult(expectedRawRecords0, expectedFinalRecords0, disableFinalTableComparison())
11241122

11251123
val catalog =
@@ -1324,7 +1322,10 @@ abstract class BaseTypingDedupingTest {
13241322
val sync2Records =
13251323
baseRecords.subList(expectedFinalRecords1.size, baseRecords.size).onEach {
13261324
(it as ObjectNode).put(
1327-
JavaBaseConstants.COLUMN_NAME_AB_GENERATION_ID,
1325+
finalMetadataColumnNames.getOrDefault(
1326+
JavaBaseConstants.COLUMN_NAME_AB_GENERATION_ID,
1327+
JavaBaseConstants.COLUMN_NAME_AB_GENERATION_ID
1328+
),
13281329
44,
13291330
)
13301331
}

airbyte-integrations/connectors/destination-snowflake/build.gradle

+1-1
Original file line numberDiff line numberDiff line change
@@ -5,7 +5,7 @@ plugins {
55
airbyteJavaConnector {
66
cdkVersionRequired = '0.44.3'
77
features = ['db-destinations', 's3-destinations', 'typing-deduping']
8-
useLocalCdk = false
8+
useLocalCdk = true
99
}
1010

1111
java {

airbyte-integrations/connectors/destination-snowflake/src/main/kotlin/io/airbyte/integrations/destination/snowflake/operation/SnowflakeStorageOperation.kt

+1-1
Original file line numberDiff line numberDiff line change
@@ -100,7 +100,7 @@ class SnowflakeStorageOperation(
100100
generationIdNode =
101101
results.first().get(JavaBaseConstants.COLUMN_NAME_AB_GENERATION_ID.uppercase())
102102
}
103-
return generationIdNode?.asLong()
103+
return generationIdNode?.asLong() ?: 0
104104
}
105105

106106
internal fun createTableQuery(streamId: StreamId, suffix: String): String {

airbyte-integrations/connectors/destination-snowflake/src/main/kotlin/io/airbyte/integrations/destination/snowflake/typing_deduping/SnowflakeDestinationHandler.kt

+51-2
Original file line numberDiff line numberDiff line change
@@ -27,6 +27,7 @@ import io.airbyte.integrations.base.destination.typing_deduping.Union
2727
import io.airbyte.integrations.base.destination.typing_deduping.UnsupportedOneOf
2828
import io.airbyte.integrations.destination.snowflake.SnowflakeDatabaseUtils
2929
import io.airbyte.integrations.destination.snowflake.migrations.SnowflakeState
30+
import io.airbyte.integrations.destination.snowflake.typing_deduping.SnowflakeSqlGenerator.Companion.QUOTE
3031
import java.sql.Connection
3132
import java.sql.DatabaseMetaData
3233
import java.sql.ResultSet
@@ -57,7 +58,7 @@ class SnowflakeDestinationHandler(
5758
namespace: String,
5859
name: String
5960
): Long? {
60-
return null
61+
throw NotImplementedError()
6162
}
6263
}
6364
) {
@@ -421,6 +422,12 @@ class SnowflakeDestinationHandler(
421422
streamConfig.id.asPair(),
422423
toDestinationState(emptyObject())
423424
)
425+
val finalTableGenerationId =
426+
if (isFinalTablePresent && !isFinalTableEmpty) {
427+
getFinalTableGenerationId(streamConfig.id)
428+
} else {
429+
null
430+
}
424431
return@map DestinationInitialStatus<SnowflakeState>(
425432
streamConfig,
426433
isFinalTablePresent,
@@ -429,7 +436,11 @@ class SnowflakeDestinationHandler(
429436
isSchemaMismatch,
430437
isFinalTableEmpty,
431438
destinationState,
432-
finalTableGenerationId = null,
439+
finalTableGenerationId = finalTableGenerationId,
440+
// I think the temp final table gen is always null?
441+
// since the only time we T+D into the temp table
442+
// is when we're committing the sync anyway
443+
// (i.e. we'll immediately rename it to the real table)
433444
finalTempTableGenerationId = null,
434445
)
435446
} catch (e: Exception) {
@@ -439,6 +450,44 @@ class SnowflakeDestinationHandler(
439450
.collect(Collectors.toList())
440451
}
441452

453+
/**
454+
* Query the final table to find the generation ID of any record. Assumes that the table exists
455+
* and is nonempty.
456+
*/
457+
private fun getFinalTableGenerationId(streamId: StreamId): Long? {
458+
val tableExistsWithGenerationId =
459+
jdbcDatabase.executeMetadataQuery {
460+
// Find a column named _airbyte_generation_id
461+
// in the relevant table.
462+
val resultSet =
463+
it.getColumns(
464+
databaseName,
465+
streamId.finalNamespace,
466+
streamId.finalName,
467+
JavaBaseConstants.COLUMN_NAME_AB_GENERATION_ID.uppercase()
468+
)
469+
// Check if there were any such columns.
470+
resultSet.next()
471+
}
472+
// The table doesn't exist, or exists but doesn't have generation id
473+
if (!tableExistsWithGenerationId) {
474+
return null
475+
}
476+
477+
return jdbcDatabase
478+
.queryJsons(
479+
"""
480+
SELECT ${JavaBaseConstants.COLUMN_NAME_AB_GENERATION_ID.uppercase()}
481+
FROM ${streamId.finalNamespace(QUOTE)}.${streamId.finalName(QUOTE)}
482+
LIMIT 1
483+
""".trimIndent(),
484+
)
485+
.first()
486+
.get(JavaBaseConstants.COLUMN_NAME_AB_GENERATION_ID.uppercase())
487+
?.asLong()
488+
?: 0
489+
}
490+
442491
override fun toJdbcTypeName(airbyteType: AirbyteType): String {
443492
if (airbyteType is AirbyteProtocolType) {
444493
return toJdbcTypeName(airbyteType)

airbyte-integrations/connectors/destination-snowflake/src/test-integration/kotlin/io/airbyte/integrations/destination/snowflake/operation/SnowflakeStagingClientIntegrationTest.kt

+12-2
Original file line numberDiff line numberDiff line change
@@ -78,7 +78,12 @@ class SnowflakeStagingClientIntegrationTest {
7878
throw NotImplementedError("This method should not be called in this test")
7979
}
8080

81-
override fun getDataRow(id: UUID, recordMessage: AirbyteRecordMessage): List<Any> {
81+
override fun getDataRow(
82+
id: UUID,
83+
recordMessage: AirbyteRecordMessage,
84+
generationId: Long,
85+
syncId: Long,
86+
): List<Any> {
8287
throw NotImplementedError("This method should not be called in this test")
8388
}
8489

@@ -129,7 +134,12 @@ class SnowflakeStagingClientIntegrationTest {
129134
throw NotImplementedError("This method should not be called in this test")
130135
}
131136

132-
override fun getDataRow(id: UUID, recordMessage: AirbyteRecordMessage): List<Any> {
137+
override fun getDataRow(
138+
id: UUID,
139+
recordMessage: AirbyteRecordMessage,
140+
generationId: Long,
141+
syncId: Long,
142+
): List<Any> {
133143
throw NotImplementedError("This method should not be called in this test")
134144
}
135145

airbyte-integrations/connectors/destination-snowflake/src/test-integration/kotlin/io/airbyte/integrations/destination/snowflake/typing_deduping/AbstractSnowflakeTypingDedupingTest.kt

-13
Original file line numberDiff line numberDiff line change
@@ -27,7 +27,6 @@ import kotlin.concurrent.Volatile
2727
import org.junit.jupiter.api.Assertions
2828
import org.junit.jupiter.api.Assertions.assertEquals
2929
import org.junit.jupiter.api.Assertions.assertThrows
30-
import org.junit.jupiter.api.Disabled
3130
import org.junit.jupiter.api.Test
3231

3332
private val LOGGER = KotlinLogging.logger {}
@@ -383,18 +382,6 @@ abstract class AbstractSnowflakeTypingDedupingTest : BaseTypingDedupingTest() {
383382
verifySyncResult(expectedRawRecords2, expectedFinalRecords2, disableFinalTableComparison())
384383
}
385384

386-
@Test
387-
@Disabled
388-
override fun interruptedTruncateWithPriorData() {
389-
super.interruptedTruncateWithPriorData()
390-
}
391-
392-
@Test
393-
@Disabled
394-
override fun interruptedOverwriteWithoutPriorData() {
395-
super.interruptedOverwriteWithoutPriorData()
396-
}
397-
398385
private val defaultSchema: String
399386
get() = config!!["schema"].asText()
400387

0 commit comments

Comments
 (0)