Skip to content

Commit 9f0ce4f

Browse files
authored
Destination Snowflake: Sync Id, generation_id and Meta (#39107)
1 parent 7e281dd commit 9f0ce4f

File tree

46 files changed

+533
-222
lines changed

Some content is hidden

Large Commits have some content hidden by default. Use the searchbox below for content that may be hidden.

46 files changed

+533
-222
lines changed

airbyte-integrations/connectors/destination-snowflake/build.gradle

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -3,7 +3,7 @@ plugins {
33
}
44

55
airbyteJavaConnector {
6-
cdkVersionRequired = '0.35.15'
6+
cdkVersionRequired = '0.37.1'
77
features = ['db-destinations', 's3-destinations', 'typing-deduping']
88
useLocalCdk = false
99
}

airbyte-integrations/connectors/destination-snowflake/metadata.yaml

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -5,7 +5,7 @@ data:
55
connectorSubtype: database
66
connectorType: destination
77
definitionId: 424892c4-daac-4491-b35d-c6688ba547ba
8-
dockerImageTag: 3.9.1
8+
dockerImageTag: 3.10.0
99
dockerRepository: airbyte/destination-snowflake
1010
documentationUrl: https://docs.airbyte.com/integrations/destinations/snowflake
1111
githubIssueLabel: destination-snowflake

airbyte-integrations/connectors/destination-snowflake/src/main/kotlin/io/airbyte/integrations/destination/snowflake/SnowflakeDestination.kt

Lines changed: 20 additions & 23 deletions
Original file line numberDiff line numberDiff line change
@@ -34,6 +34,7 @@ import io.airbyte.integrations.base.destination.typing_deduping.ParsedCatalog
3434
import io.airbyte.integrations.base.destination.typing_deduping.Sql
3535
import io.airbyte.integrations.base.destination.typing_deduping.StreamConfig
3636
import io.airbyte.integrations.base.destination.typing_deduping.migrators.Migration
37+
import io.airbyte.integrations.destination.snowflake.migrations.SnowflakeAbMetaAndGenIdMigration
3738
import io.airbyte.integrations.destination.snowflake.migrations.SnowflakeDV2Migration
3839
import io.airbyte.integrations.destination.snowflake.migrations.SnowflakeState
3940
import io.airbyte.integrations.destination.snowflake.operation.SnowflakeStagingClient
@@ -43,6 +44,7 @@ import io.airbyte.integrations.destination.snowflake.typing_deduping.SnowflakeSq
4344
import io.airbyte.protocol.models.v0.AirbyteConnectionStatus
4445
import io.airbyte.protocol.models.v0.AirbyteMessage
4546
import io.airbyte.protocol.models.v0.AirbyteRecordMessageMeta
47+
import io.airbyte.protocol.models.v0.AirbyteStreamStatusTraceMessage
4648
import io.airbyte.protocol.models.v0.ConfiguredAirbyteCatalog
4749
import io.airbyte.protocol.models.v0.DestinationSyncMode
4850
import java.util.*
@@ -53,7 +55,6 @@ import javax.sql.DataSource
5355
import net.snowflake.client.core.SFSession
5456
import net.snowflake.client.core.SFStatement
5557
import net.snowflake.client.jdbc.SnowflakeSQLException
56-
import org.apache.commons.lang3.StringUtils
5758
import org.slf4j.Logger
5859
import org.slf4j.LoggerFactory
5960

@@ -63,7 +64,7 @@ constructor(
6364
private val airbyteEnvironment: String,
6465
private val nameTransformer: NamingConventionTransformer = SnowflakeSQLNameTransformer(),
6566
) : BaseConnector(), Destination {
66-
private val destinationColumns = JavaBaseConstants.DestinationColumns.V2_WITHOUT_META
67+
private val destinationColumns = JavaBaseConstants.DestinationColumns.V2_WITH_GENERATION
6768

6869
override fun check(config: JsonNode): AirbyteConnectionStatus? {
6970
val dataSource = getDataSource(config)
@@ -123,7 +124,8 @@ constructor(
123124
),
124125
isSchemaMismatch = true,
125126
isFinalTableEmpty = true,
126-
destinationState = SnowflakeState(false)
127+
destinationState =
128+
SnowflakeState(needsSoftReset = false, isAirbyteMetaPresentInRaw = false)
127129
)
128130
// We simulate a mini-sync to see the raw table code path is exercised. and disable T+D
129131
snowflakeDestinationHandler.createNamespaces(setOf(rawTableSchemaName, outputSchema))
@@ -151,7 +153,10 @@ constructor(
151153
),
152154
)
153155
streamOperation.writeRecords(streamConfig, listOf(message).stream())
154-
streamOperation.finalizeTable(streamConfig, StreamSyncSummary.DEFAULT)
156+
streamOperation.finalizeTable(
157+
streamConfig,
158+
StreamSyncSummary(1, AirbyteStreamStatusTraceMessage.AirbyteStreamStatus.COMPLETE),
159+
)
155160
// clean up the raw table, this is intentionally not part of actual sync code
156161
// because we avoid dropping original tables directly.
157162
snowflakeDestinationHandler.execute(
@@ -190,41 +195,34 @@ constructor(
190195
AirbyteExceptionHandler.addAllStringsInConfigForDeinterpolation(config)
191196

192197
val defaultNamespace = config["schema"].asText()
193-
for (stream in catalog.streams) {
194-
if (StringUtils.isEmpty(stream.stream.namespace)) {
195-
stream.stream.namespace = defaultNamespace
196-
}
197-
}
198-
199198
val retentionPeriodDays =
200199
getRetentionPeriodDays(
201200
config[RETENTION_PERIOD_DAYS],
202201
)
203202
val sqlGenerator = SnowflakeSqlGenerator(retentionPeriodDays)
204203
val database = getDatabase(getDataSource(config))
205204
val databaseName = config[JdbcUtils.DATABASE_KEY].asText()
206-
val rawTableSchemaName: String
207-
val catalogParser: CatalogParser
208-
if (getRawNamespaceOverride(RAW_SCHEMA_OVERRIDE).isPresent) {
209-
rawTableSchemaName = getRawNamespaceOverride(RAW_SCHEMA_OVERRIDE).get()
210-
catalogParser = CatalogParser(sqlGenerator, rawTableSchemaName)
211-
} else {
212-
rawTableSchemaName = JavaBaseConstants.DEFAULT_AIRBYTE_INTERNAL_NAMESPACE
213-
catalogParser = CatalogParser(sqlGenerator)
214-
}
205+
val rawTableSchemaName: String =
206+
if (getRawNamespaceOverride(RAW_SCHEMA_OVERRIDE).isPresent) {
207+
getRawNamespaceOverride(RAW_SCHEMA_OVERRIDE).get()
208+
} else {
209+
JavaBaseConstants.DEFAULT_AIRBYTE_INTERNAL_NAMESPACE
210+
}
211+
val catalogParser = CatalogParser(sqlGenerator, defaultNamespace, rawTableSchemaName)
215212
val snowflakeDestinationHandler =
216213
SnowflakeDestinationHandler(databaseName, database, rawTableSchemaName)
217214
val parsedCatalog: ParsedCatalog = catalogParser.parseCatalog(catalog)
218215
val disableTypeDedupe =
219216
config.has(DISABLE_TYPE_DEDUPE) && config[DISABLE_TYPE_DEDUPE].asBoolean(false)
220-
val migrations =
221-
listOf<Migration<SnowflakeState>>(
217+
val migrations: List<Migration<SnowflakeState>> =
218+
listOf(
222219
SnowflakeDV2Migration(
223220
nameTransformer,
224221
database,
225222
databaseName,
226223
sqlGenerator,
227224
),
225+
SnowflakeAbMetaAndGenIdMigration(database),
228226
)
229227

230228
val snowflakeStagingClient = SnowflakeStagingClient(database)
@@ -264,8 +262,7 @@ constructor(
264262
},
265263
onFlush = DefaultFlush(optimalFlushBatchSize, syncOperation),
266264
catalog = catalog,
267-
bufferManager = BufferManager(snowflakeBufferMemoryLimit),
268-
defaultNamespace = Optional.of(defaultNamespace),
265+
bufferManager = BufferManager(defaultNamespace, snowflakeBufferMemoryLimit)
269266
)
270267
}
271268

Original file line numberDiff line numberDiff line change
@@ -0,0 +1,140 @@
1+
/*
2+
* Copyright (c) 2024 Airbyte, Inc., all rights reserved.
3+
*/
4+
5+
package io.airbyte.integrations.destination.snowflake.migrations
6+
7+
import io.airbyte.cdk.db.jdbc.JdbcDatabase
8+
import io.airbyte.cdk.integrations.base.JavaBaseConstants
9+
import io.airbyte.cdk.integrations.destination.jdbc.ColumnDefinition
10+
import io.airbyte.cdk.integrations.destination.jdbc.TableDefinition
11+
import io.airbyte.commons.json.Jsons
12+
import io.airbyte.integrations.base.destination.typing_deduping.DestinationHandler
13+
import io.airbyte.integrations.base.destination.typing_deduping.DestinationInitialStatus
14+
import io.airbyte.integrations.base.destination.typing_deduping.StreamConfig
15+
import io.airbyte.integrations.base.destination.typing_deduping.migrators.Migration
16+
import io.github.oshai.kotlinlogging.KotlinLogging
17+
import java.util.LinkedHashMap
18+
19+
private val log = KotlinLogging.logger {}
20+
21+
class SnowflakeAbMetaAndGenIdMigration(private val database: JdbcDatabase) :
22+
Migration<SnowflakeState> {
23+
override fun migrateIfNecessary(
24+
destinationHandler: DestinationHandler<SnowflakeState>,
25+
stream: StreamConfig,
26+
state: DestinationInitialStatus<SnowflakeState>
27+
): Migration.MigrationResult<SnowflakeState> {
28+
if (state.destinationState.isAirbyteMetaPresentInRaw) {
29+
log.info {
30+
"Skipping airbyte_meta/generation_id migration for ${stream.id.originalNamespace}.${stream.id.originalName} " +
31+
"because previous destination state has isAirbyteMetaPresent"
32+
}
33+
return Migration.MigrationResult(state.destinationState, false)
34+
}
35+
36+
if (!state.initialRawTableStatus.rawTableExists) {
37+
// The raw table doesn't exist. No migration necessary. Update the state.
38+
log.info {
39+
"Skipping airbyte_meta/generation_id migration for ${stream.id.originalNamespace}.${stream.id.originalName} because the raw table doesn't exist"
40+
}
41+
return Migration.MigrationResult(
42+
state.destinationState.copy(isAirbyteMetaPresentInRaw = true),
43+
false
44+
)
45+
}
46+
47+
// Snowflake will match the lowercase raw table even with QUOTED_IDENTIFIER_IGNORE_CASE =
48+
// TRUE
49+
val results =
50+
database.queryJsons(
51+
"SHOW COLUMNS IN TABLE \"${stream.id.rawNamespace}\".\"${stream.id.rawName}\""
52+
)
53+
val rawTableDefinition =
54+
results
55+
.groupBy { it.get("schema_name").asText()!! }
56+
.mapValues { (_, v) ->
57+
v.groupBy { it.get("table_name").asText()!! }
58+
.mapValuesTo(LinkedHashMap()) { (_, v) ->
59+
TableDefinition(
60+
v.associateTo(LinkedHashMap()) {
61+
// return value of data_type in show columns is a json string.
62+
val dataType = Jsons.deserialize(it.get("data_type").asText())
63+
it.get("column_name").asText()!! to
64+
ColumnDefinition(
65+
it.get("column_name").asText(),
66+
dataType.get("type").asText(),
67+
0,
68+
dataType.get("nullable").asBoolean(),
69+
)
70+
},
71+
)
72+
}
73+
}
74+
// default is lower case raw tables, for accounts with QUOTED_IDENTIFIER_IGNORE_CASE = TRUE
75+
// we have to match uppercase
76+
val isUpperCaseIdentifer =
77+
!rawTableDefinition.containsKey(stream.id.rawNamespace) &&
78+
rawTableDefinition.containsKey(stream.id.rawNamespace.uppercase())
79+
val rawNamespace: String
80+
val rawName: String
81+
val abMetaColumn: String
82+
if (isUpperCaseIdentifer) {
83+
rawNamespace = stream.id.rawNamespace.uppercase()
84+
rawName = stream.id.rawName.uppercase()
85+
abMetaColumn = JavaBaseConstants.COLUMN_NAME_AB_META.uppercase()
86+
} else {
87+
rawNamespace = stream.id.rawNamespace
88+
rawName = stream.id.rawName
89+
abMetaColumn = JavaBaseConstants.COLUMN_NAME_AB_META
90+
}
91+
rawTableDefinition[rawNamespace]?.get(rawName)?.let { tableDefinition ->
92+
if (tableDefinition.columns.containsKey(abMetaColumn)) {
93+
log.info {
94+
"Skipping airbyte_meta/generation_id migration for ${stream.id.originalNamespace}.${stream.id.originalName} " +
95+
"because the raw table already has the airbyte_meta column"
96+
}
97+
} else {
98+
log.info {
99+
"Migrating airbyte_meta/generation_id for table ${stream.id.rawNamespace}.${stream.id.rawName}"
100+
}
101+
// Quote for raw table columns
102+
val alterRawTableSql =
103+
"""
104+
ALTER TABLE "${stream.id.rawNamespace}"."${stream.id.rawName}"
105+
ADD COLUMN "${JavaBaseConstants.COLUMN_NAME_AB_META}" VARIANT,
106+
COLUMN "${JavaBaseConstants.COLUMN_NAME_AB_GENERATION_ID}" INTEGER;
107+
""".trimIndent()
108+
database.execute(alterRawTableSql)
109+
}
110+
}
111+
112+
// To avoid another metadata query in Snowflake, we rely on the initial status gathering
113+
// which already checks for the columns in the final table to indicate schema mismatch
114+
// to safeguard if the schema mismatch is due to meta columns or customer's column
115+
// executing an add column with if not exists check
116+
if (state.isFinalTablePresent && state.isSchemaMismatch) {
117+
log.info {
118+
"Migrating generation_id for table ${stream.id.finalNamespace}.${stream.id.finalName}"
119+
}
120+
// explicitly uppercase and quote the final table column.
121+
val alterFinalTableSql =
122+
"""
123+
ALTER TABLE "${stream.id.finalNamespace}"."${stream.id.finalName}"
124+
ADD COLUMN IF NOT EXISTS "${JavaBaseConstants.COLUMN_NAME_AB_GENERATION_ID.uppercase()}" INTEGER;
125+
""".trimIndent()
126+
database.execute(alterFinalTableSql)
127+
// Final table schema changed, fetch the initial status again
128+
return Migration.MigrationResult(
129+
state.destinationState.copy(isAirbyteMetaPresentInRaw = true),
130+
true
131+
)
132+
}
133+
134+
// Final table is untouched, so we don't need to fetch the initial status
135+
return Migration.MigrationResult(
136+
state.destinationState.copy(isAirbyteMetaPresentInRaw = true),
137+
false
138+
)
139+
}
140+
}

airbyte-integrations/connectors/destination-snowflake/src/main/kotlin/io/airbyte/integrations/destination/snowflake/migrations/SnowflakeDV2Migration.kt

Lines changed: 4 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -31,6 +31,9 @@ class SnowflakeDV2Migration(
3131
): Migration.MigrationResult<SnowflakeState> {
3232
log.info { "Initializing DV2 Migration check" }
3333
legacyV1V2migrator.migrateIfNecessary(sqlGenerator, destinationHandler, stream)
34-
return Migration.MigrationResult(SnowflakeState(false), true)
34+
return Migration.MigrationResult(
35+
SnowflakeState(needsSoftReset = false, isAirbyteMetaPresentInRaw = false),
36+
true
37+
)
3538
}
3639
}

airbyte-integrations/connectors/destination-snowflake/src/main/kotlin/io/airbyte/integrations/destination/snowflake/migrations/SnowflakeState.kt

Lines changed: 2 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -8,7 +8,8 @@ import io.airbyte.integrations.base.destination.typing_deduping.migrators.Minimu
88

99
// Note the nonnullable fields. Even though the underlying storage medium (a JSON blob) supports
1010
// nullability, we don't want to deal with that in our codebase.
11-
data class SnowflakeState(val needsSoftReset: Boolean) : MinimumDestinationState {
11+
data class SnowflakeState(val needsSoftReset: Boolean, val isAirbyteMetaPresentInRaw: Boolean) :
12+
MinimumDestinationState {
1213
override fun needsSoftReset(): Boolean {
1314
return needsSoftReset
1415
}

airbyte-integrations/connectors/destination-snowflake/src/main/kotlin/io/airbyte/integrations/destination/snowflake/operation/SnowflakeStorageOperation.kt

Lines changed: 11 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -51,7 +51,9 @@ class SnowflakeStorageOperation(
5151
| "${JavaBaseConstants.COLUMN_NAME_AB_RAW_ID}" VARCHAR PRIMARY KEY,
5252
| "${JavaBaseConstants.COLUMN_NAME_AB_EXTRACTED_AT}" TIMESTAMP WITH TIME ZONE DEFAULT current_timestamp(),
5353
| "${JavaBaseConstants.COLUMN_NAME_AB_LOADED_AT}" TIMESTAMP WITH TIME ZONE DEFAULT NULL,
54-
| "${JavaBaseConstants.COLUMN_NAME_DATA}" VARIANT
54+
| "${JavaBaseConstants.COLUMN_NAME_DATA}" VARIANT,
55+
| "${JavaBaseConstants.COLUMN_NAME_AB_META}" VARIANT DEFAULT NULL,
56+
| "${JavaBaseConstants.COLUMN_NAME_AB_GENERATION_ID}" INTEGER DEFAULT NULL
5557
|) data_retention_time_in_days = $retentionPeriodDays;
5658
""".trimMargin()
5759
}
@@ -60,11 +62,16 @@ class SnowflakeStorageOperation(
6062
return "TRUNCATE TABLE \"${streamId.rawNamespace}\".\"${streamId.rawName}\";\n"
6163
}
6264

63-
override fun writeToStage(streamId: StreamId, data: SerializableBuffer) {
64-
val stageName = getStageName(streamId)
65+
override fun writeToStage(streamConfig: StreamConfig, data: SerializableBuffer) {
66+
val stageName = getStageName(streamConfig.id)
6567
val stagingPath = getStagingPath()
6668
val stagedFileName = staging.uploadRecordsToStage(data, stageName, stagingPath)
67-
staging.copyIntoTableFromStage(stageName, stagingPath, listOf(stagedFileName), streamId)
69+
staging.copyIntoTableFromStage(
70+
stageName,
71+
stagingPath,
72+
listOf(stagedFileName),
73+
streamConfig.id
74+
)
6875
}
6976
override fun cleanupStage(streamId: StreamId) {
7077
val stageName = getStageName(streamId)

airbyte-integrations/connectors/destination-snowflake/src/main/kotlin/io/airbyte/integrations/destination/snowflake/typing_deduping/SnowflakeDestinationHandler.kt

Lines changed: 29 additions & 7 deletions
Original file line numberDiff line numberDiff line change
@@ -100,11 +100,19 @@ class SnowflakeDestinationHandler(
100100
database.executeMetadataQuery { databaseMetaData: DatabaseMetaData ->
101101
LOGGER.info("Retrieving table from Db metadata: {} {}", id.rawNamespace, id.rawName)
102102
try {
103-
databaseMetaData
104-
.getTables(databaseName, id.rawNamespace, id.rawName, null)
105-
.use { tables ->
106-
return@executeMetadataQuery tables.next()
107-
}
103+
val rs =
104+
databaseMetaData.getTables(databaseName, id.rawNamespace, id.rawName, null)
105+
// When QUOTED_IDENTIFIERS_IGNORE_CASE is set to true, the raw table is
106+
// interpreted as uppercase
107+
// in db metadata calls. check for both
108+
val rsUppercase =
109+
databaseMetaData.getTables(
110+
databaseName,
111+
id.rawNamespace.uppercase(),
112+
id.rawName.uppercase(),
113+
null
114+
)
115+
rs.next() || rsUppercase.next()
108116
} catch (e: SQLException) {
109117
LOGGER.error("Failed to retrieve table metadata", e)
110118
throw RuntimeException(e)
@@ -287,6 +295,14 @@ class SnowflakeDestinationHandler(
287295
"VARIANT" == existingTable.columns[abMetaColumnName]!!.type
288296
}
289297

298+
fun isAirbyteGenerationIdColumnMatch(existingTable: TableDefinition): Boolean {
299+
val abGenerationIdColumnName: String =
300+
JavaBaseConstants.COLUMN_NAME_AB_GENERATION_ID.uppercase(Locale.getDefault())
301+
return existingTable.columns.containsKey(abGenerationIdColumnName) &&
302+
toJdbcTypeName(AirbyteProtocolType.INTEGER) ==
303+
existingTable.columns[abGenerationIdColumnName]!!.type
304+
}
305+
290306
@SuppressFBWarnings("NP_PARAMETER_MUST_BE_NONNULL_BUT_MARKED_AS_NULLABLE")
291307
override fun existingSchemaMatchesStreamConfig(
292308
stream: StreamConfig?,
@@ -299,7 +315,8 @@ class SnowflakeDestinationHandler(
299315
if (
300316
!isAirbyteRawIdColumnMatch(existingTable) ||
301317
!isAirbyteExtractedAtColumnMatch(existingTable) ||
302-
!isAirbyteMetaColumnMatch(existingTable)
318+
!isAirbyteMetaColumnMatch(existingTable) ||
319+
!isAirbyteGenerationIdColumnMatch(existingTable)
303320
) {
304321
// Missing AB meta columns from final table, we need them to do proper T+D so trigger
305322
// soft-reset
@@ -417,8 +434,13 @@ class SnowflakeDestinationHandler(
417434
}
418435

419436
override fun toDestinationState(json: JsonNode): SnowflakeState {
437+
// Note the field name is isAirbyteMetaPresentInRaw but jackson interprets it as
438+
// airbyteMetaPresentInRaw when serializing so we map that to the correct field when
439+
// deserializing
420440
return SnowflakeState(
421-
json.hasNonNull("needsSoftReset") && json["needsSoftReset"].asBoolean()
441+
json.hasNonNull("needsSoftReset") && json["needsSoftReset"].asBoolean(),
442+
json.hasNonNull("airbyteMetaPresentInRaw") &&
443+
json["airbyteMetaPresentInRaw"].asBoolean()
422444
)
423445
}
424446

0 commit comments

Comments
 (0)