Skip to content

Commit 0ac4d27

Browse files
gisripaxiaohansong
authored andcommitted
Destination Snowflake: Storage ops to support refreshes (#39473)
1 parent 1e1b8e5 commit 0ac4d27

File tree

23 files changed

+507
-135
lines changed

23 files changed

+507
-135
lines changed

airbyte-cdk/java/airbyte-cdk/README.md

Lines changed: 5 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -174,18 +174,18 @@ corresponds to that version.
174174

175175
| Version | Date | Pull Request | Subject |
176176
|:--------| :--------- | :--------------------------------------------------------- |:---------------------------------------------------------------------------------------------------------------------------------------------------------------|
177-
| 0.40.8 | 2024-06-25 | [\#40499](https://github.com/airbytehq/airbyte/pull/40499) | Make JdbcDatabase SQL statement logging optional; add generation_id support to JdbcSqlGenerator |
178-
| 0.38.3 | 2024-06-25 | [\#40499](https://github.com/airbytehq/airbyte/pull/40499) | (backport) Make JdbcDatabase SQL statement logging optional; add generation_id support to JdbcSqlGenerator |
179-
| 0.40.7 | 2024-07-01 | [40516](https://github.com/airbytehq/airbyte/pull/40516) | Remove dbz hearbeat. |
177+
| 0.40.9 | 2024-07-01 | [\#39473](https://github.com/airbytehq/airbyte/pull/39473) | minor changes around error logging and testing |
178+
| 0.40.8 | 2024-07-01 | [\#40499](https://github.com/airbytehq/airbyte/pull/40499) | Make JdbcDatabase SQL statement logging optional; add generation_id support to JdbcSqlGenerator |
179+
| 0.40.7 | 2024-07-01 | [\#40516](https://github.com/airbytehq/airbyte/pull/40516) | Remove dbz hearbeat. |
180180
| ~~0.40.6~~ | | | (this version does not exist) |
181181
| 0.40.5 | 2024-06-26 | [\#40517](https://github.com/airbytehq/airbyte/pull/40517) | JdbcDatabase.executeWithinTransaction allows disabling SQL statement logging |
182-
| 0.35.16 | 2024-06-25 | [\#40517](https://github.com/airbytehq/airbyte/pull/40517) | (backport) JdbcDatabase.executeWithinTransaction allows disabling SQL statement logging |
183182
| 0.40.4 | 2024-06-18 | [\#40254](https://github.com/airbytehq/airbyte/pull/40254) | Destinations: Do not throw on unrecognized airbyte message type (ignore message instead) |
184183
| 0.40.3 | 2024-06-18 | [\#39526](https://github.com/airbytehq/airbyte/pull/39526) | Destinations: INCOMPLETE stream status is a TRANSIENT error rather than SYSTEM |
185184
| 0.40.2 | 2024-06-18 | [\#39552](https://github.com/airbytehq/airbyte/pull/39552) | Destinations: Throw error if the ConfiguredCatalog has no streams |
186185
| 0.40.1 | 2024-06-14 | [\#39349](https://github.com/airbytehq/airbyte/pull/39349) | Source stats for full refresh streams |
187186
| 0.40.0 | 2024-06-17 | [\#38622](https://github.com/airbytehq/airbyte/pull/38622) | Destinations: Implement refreshes logic in AbstractStreamOperation |
188187
| 0.39.0 | 2024-06-17 | [\#38067](https://github.com/airbytehq/airbyte/pull/38067) | Destinations: Breaking changes for refreshes (fail on INCOMPLETE stream status; ignore OVERWRITE sync mode) |
188+
| 0.38.3 | 2024-06-25 | [\#40499](https://github.com/airbytehq/airbyte/pull/40499) | (backport) Make JdbcDatabase SQL statement logging optional; add generation_id support to JdbcSqlGenerator |
189189
| 0.38.2 | 2024-06-14 | [\#39460](https://github.com/airbytehq/airbyte/pull/39460) | Bump postgres JDBC driver version |
190190
| 0.38.1 | 2024-06-13 | [\#39445](https://github.com/airbytehq/airbyte/pull/39445) | Sources: More CDK changes to handle big initial snapshots. |
191191
| 0.38.0 | 2024-06-11 | [\#39405](https://github.com/airbytehq/airbyte/pull/39405) | Sources: Debezium properties manager interface changed to accept a list of streams to scope to |
@@ -198,6 +198,7 @@ corresponds to that version.
198198
| 0.36.4 | 2024-05-31 | [\#38824](https://github.com/airbytehq/airbyte/pull/38824) | Param marked as non-null to nullable in JdbcDestinationHandler for NPE fix |
199199
| 0.36.2 | 2024-05-29 | [\#38538](https://github.com/airbytehq/airbyte/pull/38357) | Exit connector when encountering a config error. |
200200
| 0.36.0 | 2024-05-29 | [\#38358](https://github.com/airbytehq/airbyte/pull/38358) | Plumb generation_id / sync_id to destinations code |
201+
| 0.35.16 | 2024-06-25 | [\#40517](https://github.com/airbytehq/airbyte/pull/40517) | (backport) JdbcDatabase.executeWithinTransaction allows disabling SQL statement logging |
201202
| 0.35.15 | 2024-05-31 | [\#38824](https://github.com/airbytehq/airbyte/pull/38824) | Param marked as non-null to nullable in JdbcDestinationHandler for NPE fix |
202203
| 0.35.14 | 2024-05-28 | [\#38738](https://github.com/airbytehq/airbyte/pull/38738) | make ThreadCreationInfo cast as nullable |
203204
| 0.35.13 | 2024-05-28 | [\#38632](https://github.com/airbytehq/airbyte/pull/38632) | minor changes to allow conversion of snowflake tests to kotlin |

airbyte-cdk/java/airbyte-cdk/core/src/main/kotlin/io/airbyte/cdk/integrations/base/IntegrationRunner.kt

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -213,6 +213,7 @@ internal constructor(
213213
}
214214
}
215215
} catch (e: Exception) {
216+
LOGGER.error(e) { "caught exception!" }
216217
// Many of the exceptions thrown are nested inside layers of RuntimeExceptions. An
217218
// attempt is made
218219
// to
Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -1 +1 @@
1-
version=0.40.8
1+
version=0.40.9

airbyte-cdk/java/airbyte-cdk/typing-deduping/src/main/kotlin/io/airbyte/integrations/base/destination/typing_deduping/CatalogParser.kt

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -135,7 +135,7 @@ constructor(
135135

136136
@VisibleForTesting
137137
fun toStreamConfig(stream: ConfiguredAirbyteStream): StreamConfig {
138-
if (stream.generationId == null) {
138+
if (stream.generationId == null || stream.minimumGenerationId == null) {
139139
throw ConfigErrorException(
140140
"You must upgrade your platform version to use this connector version. Either downgrade your connector or upgrade platform to 0.63.0"
141141
)

airbyte-cdk/java/airbyte-cdk/typing-deduping/src/testFixtures/kotlin/io/airbyte/integrations/base/destination/typing_deduping/BaseTypingDedupingTest.kt

Lines changed: 57 additions & 25 deletions
Original file line numberDiff line numberDiff line change
@@ -40,8 +40,6 @@ import org.junit.jupiter.api.Assumptions.assumeTrue
4040
import org.junit.jupiter.api.function.Executable
4141
import org.junit.jupiter.api.parallel.Execution
4242
import org.junit.jupiter.api.parallel.ExecutionMode
43-
import org.junit.jupiter.params.ParameterizedTest
44-
import org.junit.jupiter.params.provider.EnumSource
4543

4644
private val LOGGER = KotlinLogging.logger {}
4745
/**
@@ -230,15 +228,11 @@ abstract class BaseTypingDedupingTest {
230228
* Starting with an empty destination, execute a full refresh overwrite sync. Verify that the
231229
* records are written to the destination table. Then run a second sync, and verify that the
232230
* records are overwritten.
233-
*
234-
* Parameterized on destination sync mode. After the refreshes project, APPEND and OVERWRITE
235-
* behave identically.
236231
*/
237-
@ParameterizedTest
238-
@EnumSource(DestinationSyncMode::class, names = ["APPEND", "OVERWRITE"])
239232
@Throws(Exception::class)
240-
fun truncateRefresh() {
241-
val catalog =
233+
@Test
234+
open fun truncateRefresh() {
235+
val catalog1 =
242236
io.airbyte.protocol.models.v0
243237
.ConfiguredAirbyteCatalog()
244238
.withStreams(
@@ -247,8 +241,8 @@ abstract class BaseTypingDedupingTest {
247241
.withSyncId(42)
248242
.withGenerationId(43)
249243
.withMinimumGenerationId(43)
244+
.withDestinationSyncMode(DestinationSyncMode.APPEND)
250245
.withSyncMode(SyncMode.FULL_REFRESH)
251-
.withDestinationSyncMode(DestinationSyncMode.OVERWRITE)
252246
.withStream(
253247
AirbyteStream()
254248
.withNamespace(streamNamespace)
@@ -261,36 +255,54 @@ abstract class BaseTypingDedupingTest {
261255
// First sync
262256
val messages1 = readMessages("dat/sync1_messages.jsonl")
263257

264-
runSync(catalog, messages1)
258+
runSync(catalog1, messages1)
265259

266260
val expectedRawRecords1 = readRecords("dat/sync1_expectedrecords_raw.jsonl")
267261
val expectedFinalRecords1 = readRecords("dat/sync1_expectedrecords_nondedup_final.jsonl")
268262
verifySyncResult(expectedRawRecords1, expectedFinalRecords1, disableFinalTableComparison())
269263

270264
// Second sync
271265
val messages2 = readMessages("dat/sync2_messages.jsonl")
266+
val catalog2 =
267+
io.airbyte.protocol.models.v0
268+
.ConfiguredAirbyteCatalog()
269+
.withStreams(
270+
java.util.List.of(
271+
ConfiguredAirbyteStream()
272+
.withSyncId(42)
273+
.withGenerationId(44)
274+
.withMinimumGenerationId(44)
275+
.withDestinationSyncMode(DestinationSyncMode.OVERWRITE)
276+
.withSyncMode(SyncMode.FULL_REFRESH)
277+
.withStream(
278+
AirbyteStream()
279+
.withNamespace(streamNamespace)
280+
.withName(streamName)
281+
.withJsonSchema(SCHEMA)
282+
)
283+
)
284+
)
272285

273-
runSync(catalog, messages2)
286+
runSync(catalog2, messages2)
274287

275288
val expectedRawRecords2 =
276-
readRecords("dat/sync2_expectedrecords_fullrefresh_overwrite_raw.jsonl")
289+
readRecords("dat/sync2_expectedrecords_fullrefresh_overwrite_with_new_gen_id_raw.jsonl")
277290
val expectedFinalRecords2 =
278-
readRecords("dat/sync2_expectedrecords_fullrefresh_overwrite_final.jsonl")
291+
readRecords(
292+
"dat/sync2_expectedrecords_fullrefresh_overwrite_with_new_gen_id_final.jsonl"
293+
)
279294
verifySyncResult(expectedRawRecords2, expectedFinalRecords2, disableFinalTableComparison())
280295
}
281296

282297
/**
283298
* Starting with an empty destination, execute a full refresh append sync. Verify that the
284299
* records are written to the destination table. Then run a second sync, and verify that the old
285300
* and new records are all present.
286-
*
287-
* Similar to [truncateRefresh], this is parameterized on sync mode.
288301
*/
289-
@ParameterizedTest
290-
@EnumSource(DestinationSyncMode::class, names = ["APPEND", "OVERWRITE"])
291302
@Throws(Exception::class)
292-
fun mergeRefresh() {
293-
val catalog =
303+
@Test
304+
open fun mergeRefresh() {
305+
val catalog1 =
294306
io.airbyte.protocol.models.v0
295307
.ConfiguredAirbyteCatalog()
296308
.withStreams(
@@ -299,8 +311,8 @@ abstract class BaseTypingDedupingTest {
299311
.withSyncId(42)
300312
.withGenerationId(43)
301313
.withMinimumGenerationId(0)
302-
.withSyncMode(SyncMode.FULL_REFRESH)
303314
.withDestinationSyncMode(DestinationSyncMode.APPEND)
315+
.withSyncMode(SyncMode.FULL_REFRESH)
304316
.withStream(
305317
AirbyteStream()
306318
.withNamespace(streamNamespace)
@@ -313,20 +325,39 @@ abstract class BaseTypingDedupingTest {
313325
// First sync
314326
val messages1 = readMessages("dat/sync1_messages.jsonl")
315327

316-
runSync(catalog, messages1)
328+
runSync(catalog1, messages1)
317329

318330
val expectedRawRecords1 = readRecords("dat/sync1_expectedrecords_raw.jsonl")
319331
val expectedFinalRecords1 = readRecords("dat/sync1_expectedrecords_nondedup_final.jsonl")
320332
verifySyncResult(expectedRawRecords1, expectedFinalRecords1, disableFinalTableComparison())
321333

322334
// Second sync
323335
val messages2 = readMessages("dat/sync2_messages.jsonl")
336+
val catalog2 =
337+
io.airbyte.protocol.models.v0
338+
.ConfiguredAirbyteCatalog()
339+
.withStreams(
340+
java.util.List.of(
341+
ConfiguredAirbyteStream()
342+
.withSyncId(42)
343+
.withGenerationId(44)
344+
.withMinimumGenerationId(0)
345+
.withDestinationSyncMode(DestinationSyncMode.APPEND)
346+
.withSyncMode(SyncMode.FULL_REFRESH)
347+
.withStream(
348+
AirbyteStream()
349+
.withNamespace(streamNamespace)
350+
.withName(streamName)
351+
.withJsonSchema(SCHEMA)
352+
)
353+
)
354+
)
324355

325-
runSync(catalog, messages2)
356+
runSync(catalog2, messages2)
326357

327-
val expectedRawRecords2 = readRecords("dat/sync2_expectedrecords_raw.jsonl")
358+
val expectedRawRecords2 = readRecords("dat/sync2_expectedrecords_with_new_gen_id_raw.jsonl")
328359
val expectedFinalRecords2 =
329-
readRecords("dat/sync2_expectedrecords_fullrefresh_append_final.jsonl")
360+
readRecords("dat/sync2_expectedrecords_fullrefresh_append_with_new_gen_id_final.jsonl")
330361
verifySyncResult(expectedRawRecords2, expectedFinalRecords2, disableFinalTableComparison())
331362
}
332363

@@ -1024,6 +1055,7 @@ abstract class BaseTypingDedupingTest {
10241055
disableFinalTableComparison: Boolean
10251056
) {
10261057
val actualRawRecords = dumpRawTableRecords(streamNamespace, streamName)
1058+
10271059
if (disableFinalTableComparison) {
10281060
DIFFER!!.diffRawTableRecords(expectedRawRecords, actualRawRecords)
10291061
} else {

airbyte-integrations/connectors/destination-snowflake/build.gradle

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -3,7 +3,7 @@ plugins {
33
}
44

55
airbyteJavaConnector {
6-
cdkVersionRequired = '0.37.1'
6+
cdkVersionRequired = '0.40.9'
77
features = ['db-destinations', 's3-destinations', 'typing-deduping']
88
useLocalCdk = false
99
}

airbyte-integrations/connectors/destination-snowflake/metadata.yaml

Lines changed: 2 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -5,7 +5,7 @@ data:
55
connectorSubtype: database
66
connectorType: destination
77
definitionId: 424892c4-daac-4491-b35d-c6688ba547ba
8-
dockerImageTag: 3.10.1
8+
dockerImageTag: 3.11.0
99
dockerRepository: airbyte/destination-snowflake
1010
documentationUrl: https://docs.airbyte.com/integrations/destinations/snowflake
1111
githubIssueLabel: destination-snowflake
@@ -34,6 +34,7 @@ data:
3434
memory_request: 2Gi
3535
supportLevel: certified
3636
supportsDbt: true
37+
supportsRefreshes: true
3738
tags:
3839
- language:java
3940
connectorTestSuitesOptions:

airbyte-integrations/connectors/destination-snowflake/src/main/kotlin/io/airbyte/integrations/destination/snowflake/SnowflakeDestination.kt

Lines changed: 6 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -122,6 +122,12 @@ constructor(
122122
hasUnprocessedRecords = true,
123123
maxProcessedTimestamp = Optional.empty()
124124
),
125+
initialTempRawTableStatus =
126+
InitialRawTableStatus(
127+
rawTableExists = false,
128+
hasUnprocessedRecords = true,
129+
maxProcessedTimestamp = Optional.empty()
130+
),
125131
isSchemaMismatch = true,
126132
isFinalTableEmpty = true,
127133
destinationState =

airbyte-integrations/connectors/destination-snowflake/src/main/kotlin/io/airbyte/integrations/destination/snowflake/migrations/SnowflakeAbMetaAndGenIdMigration.kt

Lines changed: 8 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -122,6 +122,14 @@ class SnowflakeAbMetaAndGenIdMigration(private val database: JdbcDatabase) :
122122
state.destinationState.copy(isAirbyteMetaPresentInRaw = true),
123123
true
124124
)
125+
} else if (!state.isFinalTablePresent) {
126+
log.info {
127+
"skipping migration of generation_id for table ${stream.id.finalNamespace}.${stream.id.finalName} because final table doesn't exist"
128+
}
129+
} else {
130+
log.info {
131+
"skipping migration of generation_id for table ${stream.id.finalNamespace}.${stream.id.finalName} because schemas match"
132+
}
125133
}
126134

127135
// Final table is untouched, so we don't need to fetch the initial status

airbyte-integrations/connectors/destination-snowflake/src/main/kotlin/io/airbyte/integrations/destination/snowflake/operation/SnowflakeStagingClient.kt

Lines changed: 6 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -169,11 +169,12 @@ class SnowflakeStagingClient(private val database: JdbcDatabase) {
169169
stageName: String,
170170
stagingPath: String,
171171
stagedFiles: List<String>,
172-
streamId: StreamId
172+
streamId: StreamId,
173+
suffix: String = ""
173174
) {
174175
try {
175176
val queryId = UUID.randomUUID()
176-
val query = getCopyQuery(stageName, stagingPath, stagedFiles, streamId)
177+
val query = getCopyQuery(stageName, stagingPath, stagedFiles, streamId, suffix)
177178
log.info { "query $queryId, $query" }
178179
// queryJsons is intentionally used here to get the error message in case of failure
179180
// instead of execute
@@ -252,12 +253,13 @@ class SnowflakeStagingClient(private val database: JdbcDatabase) {
252253
stageName: String,
253254
stagingPath: String,
254255
stagedFiles: List<String>,
255-
streamId: StreamId
256+
streamId: StreamId,
257+
suffix: String
256258
): String {
257259
return String.format(
258260
COPY_QUERY_1S1T + generateFilesList(stagedFiles) + ";",
259261
streamId.rawNamespace,
260-
streamId.rawName,
262+
streamId.rawName + suffix,
261263
stageName,
262264
stagingPath
263265
)

0 commit comments

Comments
 (0)