Skip to content

Commit 7ba3e2d

Browse files
authored
Destination snowflake: pull in cdk update for refreshes bugfix (#42505)
1 parent a198273 commit 7ba3e2d

File tree

8 files changed

+114
-31
lines changed

8 files changed

+114
-31
lines changed

airbyte-integrations/connectors/destination-snowflake/build.gradle

+1-1
Original file line numberDiff line numberDiff line change
@@ -3,7 +3,7 @@ plugins {
33
}
44

55
airbyteJavaConnector {
6-
cdkVersionRequired = '0.44.3'
6+
cdkVersionRequired = '0.44.14'
77
features = ['db-destinations', 's3-destinations', 'typing-deduping']
88
useLocalCdk = false
99
}

airbyte-integrations/connectors/destination-snowflake/metadata.yaml

+1-1
Original file line numberDiff line numberDiff line change
@@ -5,7 +5,7 @@ data:
55
connectorSubtype: database
66
connectorType: destination
77
definitionId: 424892c4-daac-4491-b35d-c6688ba547ba
8-
dockerImageTag: 3.11.7
8+
dockerImageTag: 3.11.8
99
dockerRepository: airbyte/destination-snowflake
1010
documentationUrl: https://docs.airbyte.com/integrations/destinations/snowflake
1111
githubIssueLabel: destination-snowflake

airbyte-integrations/connectors/destination-snowflake/src/main/kotlin/io/airbyte/integrations/destination/snowflake/operation/SnowflakeStorageOperation.kt

+1-1
Original file line numberDiff line numberDiff line change
@@ -100,7 +100,7 @@ class SnowflakeStorageOperation(
100100
generationIdNode =
101101
results.first().get(JavaBaseConstants.COLUMN_NAME_AB_GENERATION_ID.uppercase())
102102
}
103-
return generationIdNode?.asLong()
103+
return generationIdNode?.asLong() ?: 0
104104
}
105105

106106
internal fun createTableQuery(streamId: StreamId, suffix: String): String {

airbyte-integrations/connectors/destination-snowflake/src/main/kotlin/io/airbyte/integrations/destination/snowflake/typing_deduping/SnowflakeDestinationHandler.kt

+51-2
Original file line numberDiff line numberDiff line change
@@ -27,6 +27,7 @@ import io.airbyte.integrations.base.destination.typing_deduping.Union
2727
import io.airbyte.integrations.base.destination.typing_deduping.UnsupportedOneOf
2828
import io.airbyte.integrations.destination.snowflake.SnowflakeDatabaseUtils
2929
import io.airbyte.integrations.destination.snowflake.migrations.SnowflakeState
30+
import io.airbyte.integrations.destination.snowflake.typing_deduping.SnowflakeSqlGenerator.Companion.QUOTE
3031
import java.sql.Connection
3132
import java.sql.DatabaseMetaData
3233
import java.sql.ResultSet
@@ -57,7 +58,7 @@ class SnowflakeDestinationHandler(
5758
namespace: String,
5859
name: String
5960
): Long? {
60-
return null
61+
throw NotImplementedError()
6162
}
6263
}
6364
) {
@@ -421,6 +422,12 @@ class SnowflakeDestinationHandler(
421422
streamConfig.id.asPair(),
422423
toDestinationState(emptyObject())
423424
)
425+
val finalTableGenerationId =
426+
if (isFinalTablePresent && !isFinalTableEmpty) {
427+
getFinalTableGenerationId(streamConfig.id)
428+
} else {
429+
null
430+
}
424431
return@map DestinationInitialStatus<SnowflakeState>(
425432
streamConfig,
426433
isFinalTablePresent,
@@ -429,7 +436,11 @@ class SnowflakeDestinationHandler(
429436
isSchemaMismatch,
430437
isFinalTableEmpty,
431438
destinationState,
432-
finalTableGenerationId = null,
439+
finalTableGenerationId = finalTableGenerationId,
440+
// I think the temp final table gen is always null?
441+
// since the only time we T+D into the temp table
442+
// is when we're committing the sync anyway
443+
// (i.e. we'll immediately rename it to the real table)
433444
finalTempTableGenerationId = null,
434445
)
435446
} catch (e: Exception) {
@@ -439,6 +450,44 @@ class SnowflakeDestinationHandler(
439450
.collect(Collectors.toList())
440451
}
441452

453+
/**
454+
* Query the final table to find the generation ID of any record. Assumes that the table exists
455+
* and is nonempty.
456+
*/
457+
private fun getFinalTableGenerationId(streamId: StreamId): Long? {
458+
val tableExistsWithGenerationId =
459+
jdbcDatabase.executeMetadataQuery {
460+
// Find a column named _airbyte_generation_id
461+
// in the relevant table.
462+
val resultSet =
463+
it.getColumns(
464+
databaseName,
465+
streamId.finalNamespace,
466+
streamId.finalName,
467+
JavaBaseConstants.COLUMN_NAME_AB_GENERATION_ID.uppercase()
468+
)
469+
// Check if there were any such columns.
470+
resultSet.next()
471+
}
472+
// The table doesn't exist, or exists but doesn't have generation id
473+
if (!tableExistsWithGenerationId) {
474+
return null
475+
}
476+
477+
return jdbcDatabase
478+
.queryJsons(
479+
"""
480+
SELECT ${JavaBaseConstants.COLUMN_NAME_AB_GENERATION_ID.uppercase()}
481+
FROM ${streamId.finalNamespace(QUOTE)}.${streamId.finalName(QUOTE)}
482+
LIMIT 1
483+
""".trimIndent(),
484+
)
485+
.first()
486+
.get(JavaBaseConstants.COLUMN_NAME_AB_GENERATION_ID.uppercase())
487+
?.asLong()
488+
?: 0
489+
}
490+
442491
override fun toJdbcTypeName(airbyteType: AirbyteType): String {
443492
if (airbyteType is AirbyteProtocolType) {
444493
return toJdbcTypeName(airbyteType)

airbyte-integrations/connectors/destination-snowflake/src/test-integration/kotlin/io/airbyte/integrations/destination/snowflake/operation/SnowflakeStagingClientIntegrationTest.kt

+12-2
Original file line numberDiff line numberDiff line change
@@ -78,7 +78,12 @@ class SnowflakeStagingClientIntegrationTest {
7878
throw NotImplementedError("This method should not be called in this test")
7979
}
8080

81-
override fun getDataRow(id: UUID, recordMessage: AirbyteRecordMessage): List<Any> {
81+
override fun getDataRow(
82+
id: UUID,
83+
recordMessage: AirbyteRecordMessage,
84+
generationId: Long,
85+
syncId: Long,
86+
): List<Any> {
8287
throw NotImplementedError("This method should not be called in this test")
8388
}
8489

@@ -129,7 +134,12 @@ class SnowflakeStagingClientIntegrationTest {
129134
throw NotImplementedError("This method should not be called in this test")
130135
}
131136

132-
override fun getDataRow(id: UUID, recordMessage: AirbyteRecordMessage): List<Any> {
137+
override fun getDataRow(
138+
id: UUID,
139+
recordMessage: AirbyteRecordMessage,
140+
generationId: Long,
141+
syncId: Long,
142+
): List<Any> {
133143
throw NotImplementedError("This method should not be called in this test")
134144
}
135145

airbyte-integrations/connectors/destination-snowflake/src/test-integration/kotlin/io/airbyte/integrations/destination/snowflake/typing_deduping/AbstractSnowflakeTypingDedupingTest.kt

+46-23
Original file line numberDiff line numberDiff line change
@@ -26,14 +26,14 @@ import javax.sql.DataSource
2626
import kotlin.concurrent.Volatile
2727
import org.junit.jupiter.api.Assertions
2828
import org.junit.jupiter.api.Assertions.assertEquals
29-
import org.junit.jupiter.api.Assertions.assertThrows
30-
import org.junit.jupiter.api.Disabled
3129
import org.junit.jupiter.api.Test
3230

3331
private val LOGGER = KotlinLogging.logger {}
3432

35-
abstract class AbstractSnowflakeTypingDedupingTest : BaseTypingDedupingTest() {
36-
private var databaseName: String? = null
33+
abstract class AbstractSnowflakeTypingDedupingTest(
34+
private val forceUppercaseIdentifiers: Boolean = false,
35+
) : BaseTypingDedupingTest() {
36+
private lateinit var databaseName: String
3737
private var database: JdbcDatabase? = null
3838
// not super happy with this one, but our test classes are not super kotlin-friendly
3939
private lateinit var dataSource: DataSource
@@ -55,7 +55,7 @@ abstract class AbstractSnowflakeTypingDedupingTest : BaseTypingDedupingTest() {
5555
dataSource =
5656
SnowflakeDatabaseUtils.createDataSource(config, OssCloudEnvVarConsts.AIRBYTE_OSS)
5757
database = SnowflakeDatabaseUtils.getDatabase(dataSource)
58-
cleanAirbyteInternalTable(database)
58+
cleanAirbyteInternalTable(databaseName, database, forceUppercaseIdentifiers)
5959
return config
6060
}
6161

@@ -103,8 +103,15 @@ abstract class AbstractSnowflakeTypingDedupingTest : BaseTypingDedupingTest() {
103103
DROP SCHEMA IF EXISTS "%s" CASCADE
104104
105105
""".trimIndent(),
106-
rawSchema, // Raw table is still lowercase.
107-
StreamId.concatenateRawTableName(namespaceOrDefault, streamName),
106+
// Raw table is still lowercase by default
107+
if (forceUppercaseIdentifiers) {
108+
rawSchema.uppercase()
109+
} else {
110+
rawSchema
111+
},
112+
StreamId.concatenateRawTableName(namespaceOrDefault, streamName).let {
113+
if (forceUppercaseIdentifiers) it.uppercase() else it
114+
},
108115
namespaceOrDefault.uppercase(Locale.getDefault()),
109116
),
110117
)
@@ -383,18 +390,6 @@ abstract class AbstractSnowflakeTypingDedupingTest : BaseTypingDedupingTest() {
383390
verifySyncResult(expectedRawRecords2, expectedFinalRecords2, disableFinalTableComparison())
384391
}
385392

386-
@Test
387-
@Disabled
388-
override fun interruptedTruncateWithPriorData() {
389-
super.interruptedTruncateWithPriorData()
390-
}
391-
392-
@Test
393-
@Disabled
394-
override fun interruptedOverwriteWithoutPriorData() {
395-
super.interruptedOverwriteWithoutPriorData()
396-
}
397-
398393
private val defaultSchema: String
399394
get() = config!!["schema"].asText()
400395

@@ -419,17 +414,45 @@ abstract class AbstractSnowflakeTypingDedupingTest : BaseTypingDedupingTest() {
419414
@Volatile private var cleanedAirbyteInternalTable = false
420415

421416
@Throws(SQLException::class)
422-
private fun cleanAirbyteInternalTable(database: JdbcDatabase?) {
417+
private fun cleanAirbyteInternalTable(
418+
databaseName: String,
419+
database: JdbcDatabase?,
420+
forceUppercase: Boolean,
421+
) {
423422
if (!cleanedAirbyteInternalTable) {
424423
synchronized(AbstractSnowflakeTypingDedupingTest::class.java) {
425424
if (!cleanedAirbyteInternalTable) {
426-
database!!.execute(
427-
"DELETE FROM \"airbyte_internal\".\"_airbyte_destination_state\" WHERE \"updated_at\" < current_date() - 7",
428-
)
425+
val destinationStateTableExists =
426+
database!!.executeMetadataQuery {
427+
it.getTables(
428+
databaseName,
429+
if (forceUppercase) {
430+
"AIRBYTE_INTERNAL"
431+
} else {
432+
"airbyte_internal"
433+
},
434+
if (forceUppercase) {
435+
"_AIRBYTE_DESTINATION_STATE"
436+
} else {
437+
"_airbyte_destination_state"
438+
},
439+
null
440+
)
441+
.next()
442+
}
443+
if (destinationStateTableExists) {
444+
database.execute(
445+
"""DELETE FROM "airbyte_internal"."_airbyte_destination_state" WHERE "updated_at" < current_date() - 7""",
446+
)
447+
}
429448
cleanedAirbyteInternalTable = true
430449
}
431450
}
432451
}
433452
}
434453
}
435454
}
455+
456+
open class Batch(val name: String)
457+
458+
class LocalFileBatch(name: String) : Batch(name)

airbyte-integrations/connectors/destination-snowflake/src/test-integration/kotlin/io/airbyte/integrations/destination/snowflake/typing_deduping/SnowflakeInternalStagingCaseInsensitiveTypingDedupingTest.kt

+1-1
Original file line numberDiff line numberDiff line change
@@ -9,7 +9,7 @@ import io.airbyte.commons.json.Jsons.emptyObject
99
import java.util.*
1010

1111
class SnowflakeInternalStagingCaseInsensitiveTypingDedupingTest :
12-
AbstractSnowflakeTypingDedupingTest() {
12+
AbstractSnowflakeTypingDedupingTest(forceUppercaseIdentifiers = true) {
1313
override val configPath: String
1414
get() = "secrets/1s1t_case_insensitive.json"
1515

docs/integrations/destinations/snowflake.md

+1
Original file line numberDiff line numberDiff line change
@@ -268,6 +268,7 @@ desired namespace.
268268

269269
| Version | Date | Pull Request | Subject |
270270
|:----------------|:-----------|:-----------------------------------------------------------|:---------------------------------------------------------------------------------------------------------------------------------------------------------------------------------|
271+
| 3.11.8 | 2024-08-16 | [\#42505](https://github.com/airbytehq/airbyte/pull/42505) | Fix bug in refreshes logic (already mitigated in platform, just fixing protocol compliance) |
271272
| 3.11.7 | 2024-08-09 | [\#43440](https://github.com/airbytehq/airbyte/pull/43440) | remove contention on state table by deleting rows ony once every 100 updates |
272273
| 3.11.6 | 2024-08-09 | [\#43332](https://github.com/airbytehq/airbyte/pull/43332) | bump Java CDK |
273274
| 3.11.5 | 2024-08-07 | [\#43348](https://github.com/airbytehq/airbyte/pull/43348) | SnowflakeSqlGen cleanup to Kotlin string interpolation |

0 commit comments

Comments
 (0)