Skip to content

Commit 459c37c

Browse files
committed
snowflake-stagingclient-enh
1 parent 3d8f7ca commit 459c37c

File tree

6 files changed

+303
-22
lines changed

6 files changed

+303
-22
lines changed

airbyte-integrations/connectors/destination-snowflake/build.gradle

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -3,7 +3,7 @@ plugins {
33
}
44

55
airbyteJavaConnector {
6-
cdkVersionRequired = '0.35.14'
6+
cdkVersionRequired = '0.35.15'
77
features = ['db-destinations', 's3-destinations', 'typing-deduping']
88
useLocalCdk = false
99
}

airbyte-integrations/connectors/destination-snowflake/metadata.yaml

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -5,7 +5,7 @@ data:
55
connectorSubtype: database
66
connectorType: destination
77
definitionId: 424892c4-daac-4491-b35d-c6688ba547ba
8-
dockerImageTag: 3.9.0
8+
dockerImageTag: 3.9.1
99
dockerRepository: airbyte/destination-snowflake
1010
documentationUrl: https://docs.airbyte.com/integrations/destinations/snowflake
1111
githubIssueLabel: destination-snowflake

airbyte-integrations/connectors/destination-snowflake/src/main/kotlin/io/airbyte/integrations/destination/snowflake/operation/SnowflakeStagingClient.kt

Lines changed: 91 additions & 5 deletions
Original file line numberDiff line numberDiff line change
@@ -3,6 +3,7 @@
33
*/
44
package io.airbyte.integrations.destination.snowflake.operation
55

6+
import com.fasterxml.jackson.databind.JsonNode
67
import io.airbyte.cdk.db.jdbc.JdbcDatabase
78
import io.airbyte.cdk.integrations.destination.record_buffer.SerializableBuffer
89
import io.airbyte.commons.string.Strings.join
@@ -18,6 +19,22 @@ private val log = KotlinLogging.logger {}
1819
/** Client wrapper providing Snowflake Stage related operations. */
1920
class SnowflakeStagingClient(private val database: JdbcDatabase) {
2021

22+
private data class CopyIntoTableResult(
23+
val file: String,
24+
val copyStatus: CopyStatus,
25+
val rowsParsed: Int,
26+
val rowsLoaded: Int,
27+
val errorsSeen: Int,
28+
val firstError: String?
29+
)
30+
31+
private enum class CopyStatus {
32+
UNKNOWN,
33+
LOADED,
34+
LOAD_FAILED,
35+
PARTIALLY_LOADED
36+
}
37+
2138
// Most of the code here is preserved from
2239
// https://github.com/airbytehq/airbyte/blob/503b819b846663b0dff4c90322d0219a93e61d14/airbyte-integrations/connectors/destination-snowflake/src/main/java/io/airbyte/integrations/destination/snowflake/SnowflakeInternalStagingSqlOperations.java
2340
@Throws(IOException::class)
@@ -63,8 +80,18 @@ class SnowflakeStagingClient(private val database: JdbcDatabase) {
6380
recordsData: SerializableBuffer
6481
) {
6582
val query = getPutQuery(stageName, stagingPath, recordsData.file!!.absolutePath)
66-
log.info { "Executing query: $query" }
67-
database.execute(query)
83+
val queryId = UUID.randomUUID()
84+
log.info { "executing query $queryId, $query" }
85+
val results = database.queryJsons(query)
86+
if (results.isNotEmpty() && (results.first().has("source_size"))) {
87+
if (results.first().get("source_size").asLong() == 0L) {
88+
// TODO: Should we break the Sync rather than proceeding with empty file for COPY ?
89+
log.warn {
90+
"query $queryId, uploaded an empty file, no new records will be inserted"
91+
}
92+
}
93+
}
94+
log.info { "query $queryId, completed with $results" }
6895
if (!checkStageObjectExists(stageName, stagingPath, recordsData.filename)) {
6996
log.error {
7097
"Failed to upload data into stage, object @${
@@ -84,7 +111,8 @@ class SnowflakeStagingClient(private val database: JdbcDatabase) {
84111
filePath,
85112
stageName,
86113
stagingPath,
87-
Runtime.getRuntime().availableProcessors()
114+
// max allowed param is 99, we don't need so many threads for a single file upload
115+
minOf(Runtime.getRuntime().availableProcessors(), 4)
88116
)
89117
}
90118

@@ -144,14 +172,72 @@ class SnowflakeStagingClient(private val database: JdbcDatabase) {
144172
streamId: StreamId
145173
) {
146174
try {
175+
val queryId = UUID.randomUUID()
147176
val query = getCopyQuery(stageName, stagingPath, stagedFiles, streamId)
148-
log.info { "Executing query: $query" }
149-
database.execute(query)
177+
log.info { "query $queryId, $query" }
178+
// queryJsons is intentionally used here to get the error message in case of failure
179+
// instead of execute
180+
val results = database.queryJsons(query)
181+
if (results.isNotEmpty()) {
182+
// There will be only one row returned as the result of COPY INTO query
183+
val copyResult = getCopyResult(results.first())
184+
when (copyResult.copyStatus) {
185+
CopyStatus.LOADED ->
186+
log.info {
187+
"query $queryId, successfully loaded ${copyResult.rowsLoaded} rows of data into table"
188+
}
189+
CopyStatus.LOAD_FAILED -> {
190+
log.error {
191+
"query $queryId, failed to load data into table, " +
192+
"rows_parsed: ${copyResult.rowsParsed}, " +
193+
"rows_loaded: ${copyResult.rowsLoaded} " +
194+
"errors: ${copyResult.errorsSeen}, " +
195+
"firstError: ${copyResult.firstError}"
196+
}
197+
throw Exception(
198+
"COPY into table failed with ${copyResult.errorsSeen} errors, check logs"
199+
)
200+
}
201+
else -> log.warn { "query $queryId, unrecognized result format, $results" }
202+
}
203+
} else {
204+
log.warn { "query $queryId, no result returned" }
205+
}
150206
} catch (e: SQLException) {
151207
throw SnowflakeDatabaseUtils.checkForKnownConfigExceptions(e).orElseThrow { e }
152208
}
153209
}
154210

211+
private fun getCopyResult(result: JsonNode): CopyIntoTableResult {
212+
if (
213+
result.has("file") &&
214+
result.has("status") &&
215+
result.has("rows_parsed") &&
216+
result.has("rows_loaded") &&
217+
result.has("errors_seen")
218+
) {
219+
val status =
220+
when (result.get("status").asText()) {
221+
"LOADED" -> CopyStatus.LOADED
222+
"LOAD_FAILED" -> CopyStatus.LOAD_FAILED
223+
"PARTIALLY_LOADED" -> CopyStatus.PARTIALLY_LOADED
224+
else -> CopyStatus.UNKNOWN
225+
}
226+
return CopyIntoTableResult(
227+
result.get("file").asText(),
228+
status,
229+
result.get("rows_parsed").asInt(),
230+
result.get("rows_loaded").asInt(),
231+
result.get("errors_seen").asInt(),
232+
if (result.has("first_error")) result.get("first_error").asText() else null
233+
)
234+
} else {
235+
// Safety in case snowflake decides to change the response format
236+
// instead of blowing up, we return a default object
237+
return CopyIntoTableResult("", CopyStatus.UNKNOWN, 0, 0, 0, null)
238+
}
239+
}
240+
155241
/**
156242
* Creates a SQL query to bulk copy data into fully qualified destination table See
157243
* https://docs.snowflake.com/en/sql-reference/sql/copy-into-table.html for more context
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,180 @@
1+
/*
2+
* Copyright (c) 2024 Airbyte, Inc., all rights reserved.
3+
*/
4+
5+
package io.airbyte.integrations.destination.snowflake.operation
6+
7+
import com.fasterxml.jackson.databind.JsonNode
8+
import io.airbyte.cdk.db.jdbc.JdbcDatabase
9+
import io.airbyte.cdk.integrations.destination.record_buffer.FileBuffer
10+
import io.airbyte.cdk.integrations.destination.s3.csv.CsvSerializedBuffer
11+
import io.airbyte.cdk.integrations.destination.s3.csv.CsvSheetGenerator
12+
import io.airbyte.commons.json.Jsons
13+
import io.airbyte.commons.string.Strings
14+
import io.airbyte.integrations.base.destination.typing_deduping.StreamId
15+
import io.airbyte.integrations.destination.snowflake.OssCloudEnvVarConsts
16+
import io.airbyte.integrations.destination.snowflake.SnowflakeDatabaseUtils
17+
import io.airbyte.protocol.models.v0.AirbyteRecordMessage
18+
import java.nio.file.Files
19+
import java.nio.file.Paths
20+
import java.time.Instant
21+
import java.util.*
22+
import org.junit.jupiter.api.AfterEach
23+
import org.junit.jupiter.api.Assertions.*
24+
import org.junit.jupiter.api.BeforeEach
25+
import org.junit.jupiter.api.Test
26+
27+
class SnowflakeStagingClientIntegrationTest {
28+
29+
private lateinit var stagingClient: SnowflakeStagingClient
30+
// Not using lateinit to keep spotBugs happy
31+
// since these vars are referenced within the setup
32+
// and generated bytecode as if non-null check
33+
private var namespace: String = ""
34+
private var tablename: String = ""
35+
36+
private lateinit var stageName: String
37+
private val config =
38+
Jsons.deserialize(Files.readString(Paths.get("secrets/1s1t_internal_staging_config.json")))
39+
private val datasource =
40+
SnowflakeDatabaseUtils.createDataSource(config, OssCloudEnvVarConsts.AIRBYTE_OSS)
41+
private val database: JdbcDatabase = SnowflakeDatabaseUtils.getDatabase(datasource)
42+
// Intentionally not using actual columns, since the staging client should be agnostic of these
43+
// and only follow the order of data.
44+
45+
@BeforeEach
46+
fun setUp() {
47+
namespace = Strings.addRandomSuffix("staging_client_test", "_", 5).uppercase()
48+
tablename = "integration_test_raw".uppercase()
49+
val createSchemaQuery = """
50+
CREATE SCHEMA "$namespace"
51+
""".trimIndent()
52+
val createStagingTableQuery =
53+
"""
54+
CREATE TABLE IF NOT EXISTS "$namespace"."$tablename" (
55+
"id" VARCHAR PRIMARY KEY,
56+
"emitted_at" TIMESTAMP WITH TIME ZONE DEFAULT current_timestamp(),
57+
"data" VARIANT
58+
)
59+
""".trimIndent()
60+
stageName = """"$namespace"."${Strings.addRandomSuffix("stage", "_", 5)}""""
61+
stagingClient = SnowflakeStagingClient(database)
62+
database.execute(createSchemaQuery)
63+
stagingClient.createStageIfNotExists(stageName)
64+
database.execute(createStagingTableQuery)
65+
}
66+
67+
@AfterEach
68+
fun tearDown() {
69+
stagingClient.dropStageIfExists(stageName)
70+
database.execute("DROP SCHEMA IF EXISTS \"$namespace\" CASCADE")
71+
}
72+
73+
@Test
74+
fun verifyUploadAndCopyToTableSuccess() {
75+
val csvSheetGenerator =
76+
object : CsvSheetGenerator {
77+
override fun getDataRow(formattedData: JsonNode): List<Any> {
78+
throw NotImplementedError("This method should not be called in this test")
79+
}
80+
81+
override fun getDataRow(id: UUID, recordMessage: AirbyteRecordMessage): List<Any> {
82+
throw NotImplementedError("This method should not be called in this test")
83+
}
84+
85+
override fun getDataRow(
86+
id: UUID,
87+
formattedString: String,
88+
emittedAt: Long,
89+
formattedAirbyteMetaString: String
90+
): List<Any> {
91+
return listOf(id, Instant.ofEpochMilli(emittedAt), formattedString)
92+
}
93+
94+
override fun getHeaderRow(): List<String> {
95+
throw NotImplementedError("This method should not be called in this test")
96+
}
97+
}
98+
val writeBuffer =
99+
CsvSerializedBuffer(
100+
FileBuffer(CsvSerializedBuffer.CSV_GZ_SUFFIX),
101+
csvSheetGenerator,
102+
true,
103+
)
104+
val streamId = StreamId("unused", "unused", namespace, tablename, "unused", "unused")
105+
val stagingPath = "${UUID.randomUUID()}/test/"
106+
writeBuffer.use {
107+
it.accept(""" {"dummyKey": "dummyValue"} """, "", System.currentTimeMillis())
108+
it.accept(""" {"dummyKey": "dummyValue"} """, "", System.currentTimeMillis())
109+
it.flush()
110+
val fileName = stagingClient.uploadRecordsToStage(writeBuffer, stageName, stagingPath)
111+
stagingClient.copyIntoTableFromStage(stageName, stagingPath, listOf(fileName), streamId)
112+
}
113+
val results =
114+
database.queryJsons(
115+
"SELECT * FROM \"${streamId.rawNamespace}\".\"${streamId.rawName}\""
116+
)
117+
assertTrue(results.size == 2)
118+
assertNotNull(results.first().get("id"))
119+
assertNotNull(results.first().get("emitted_at"))
120+
assertNotNull(results.first().get("data"))
121+
}
122+
123+
@Test
124+
fun verifyUploadAndCopyToTableFailureOnMismatchedColumns() {
125+
val mismatchedColumnsSheetGenerator =
126+
object : CsvSheetGenerator {
127+
override fun getDataRow(formattedData: JsonNode): List<Any> {
128+
throw NotImplementedError("This method should not be called in this test")
129+
}
130+
131+
override fun getDataRow(id: UUID, recordMessage: AirbyteRecordMessage): List<Any> {
132+
throw NotImplementedError("This method should not be called in this test")
133+
}
134+
135+
override fun getDataRow(
136+
id: UUID,
137+
formattedString: String,
138+
emittedAt: Long,
139+
formattedAirbyteMetaString: String
140+
): List<Any> {
141+
return listOf(
142+
id,
143+
Instant.ofEpochMilli(emittedAt),
144+
formattedString,
145+
"unknown_data_column"
146+
)
147+
}
148+
149+
override fun getHeaderRow(): List<String> {
150+
throw NotImplementedError("This method should not be called in this test")
151+
}
152+
}
153+
val writeBuffer =
154+
CsvSerializedBuffer(
155+
FileBuffer(CsvSerializedBuffer.CSV_GZ_SUFFIX),
156+
mismatchedColumnsSheetGenerator,
157+
true,
158+
)
159+
val streamId = StreamId("unused", "unused", namespace, tablename, "unused", "unused")
160+
val stagingPath = "${UUID.randomUUID()}/test/"
161+
writeBuffer.use {
162+
it.accept(""" {"dummyKey": "dummyValue"} """, "", System.currentTimeMillis())
163+
it.flush()
164+
val fileName = stagingClient.uploadRecordsToStage(writeBuffer, stageName, stagingPath)
165+
assertThrows(Exception::class.java) {
166+
stagingClient.copyIntoTableFromStage(
167+
stageName,
168+
stagingPath,
169+
listOf(fileName),
170+
streamId
171+
)
172+
}
173+
}
174+
val results =
175+
database.queryJsons(
176+
"SELECT * FROM \"${streamId.rawNamespace}\".\"${streamId.rawName}\""
177+
)
178+
assertTrue(results.isEmpty())
179+
}
180+
}

0 commit comments

Comments
 (0)