Skip to content

Commit f519db1

Browse files
authored
Destinations CDK: Minor cleanup for snowflake (#38572)
1 parent a51456a commit f519db1

File tree

5 files changed

+116
-1
lines changed

5 files changed

+116
-1
lines changed

airbyte-cdk/java/airbyte-cdk/README.md

+1
Original file line numberDiff line numberDiff line change
@@ -174,6 +174,7 @@ corresponds to that version.
174174

175175
| Version | Date | Pull Request | Subject |
176176
|:--------|:-----------|:-----------------------------------------------------------|:---------------------------------------------------------------------------------------------------------------------------------------------------------------|
177+
| 0.35.8 | 2024-05-22 | [\#38572](https://github.com/airbytehq/airbyte/pull/38572) | Add a temporary static method to decouple SnowflakeDestination from AbstractJdbcDestination |
177178
| 0.35.7 | 2024-05-20 | [\#38357](https://github.com/airbytehq/airbyte/pull/38357) | Decouple create namespace from per stream operation interface. |
178179
| 0.35.6 | 2024-05-17 | [\#38107](https://github.com/airbytehq/airbyte/pull/38107) | New interfaces for Destination connectors to plug into AsyncStreamConsumer |
179180
| 0.35.5 | 2024-05-17 | [\#38204](https://github.com/airbytehq/airbyte/pull/38204) | add assume-role authentication to s3 |
Original file line numberDiff line numberDiff line change
@@ -1 +1 @@
1-
version=0.35.7
1+
version=0.35.8
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,112 @@
1+
/*
2+
* Copyright (c) 2024 Airbyte, Inc., all rights reserved.
3+
*/
4+
5+
package io.airbyte.cdk.integrations.destination.jdbc
6+
7+
import io.airbyte.cdk.db.jdbc.JdbcDatabase
8+
import io.airbyte.cdk.db.jdbc.JdbcUtils
9+
import io.airbyte.cdk.integrations.destination.NamingConventionTransformer
10+
import io.airbyte.cdk.integrations.destination.async.model.PartialAirbyteMessage
11+
import io.airbyte.cdk.integrations.destination.async.model.PartialAirbyteRecordMessage
12+
import io.airbyte.commons.exceptions.ConnectionErrorException
13+
import io.airbyte.commons.json.Jsons
14+
import java.sql.Connection
15+
import java.sql.ResultSet
16+
import java.sql.SQLException
17+
import java.util.*
18+
19+
object JdbcCheckOperations {
20+
21+
/**
22+
* Verifies if provided creds has enough permissions. Steps are: 1. Create schema if not exists.
23+
* 2. Create test table. 3. Insert dummy record to newly created table if "attemptInsert" set to
24+
* true.
25+
* 4. Delete table created on step 2.
26+
*
27+
* @param outputSchema
28+
* - schema to tests against.
29+
* @param database
30+
* - database to tests against.
31+
* @param namingResolver
32+
* - naming resolver.
33+
* @param sqlOps
34+
* - SqlOperations object
35+
* @param attemptInsert
36+
* - set true if need to make attempt to insert dummy records to newly created table. Set false
37+
* to skip insert step.
38+
*/
39+
@JvmStatic
40+
@Throws(Exception::class)
41+
fun attemptTableOperations(
42+
outputSchema: String?,
43+
database: JdbcDatabase,
44+
namingResolver: NamingConventionTransformer,
45+
sqlOps: SqlOperations,
46+
attemptInsert: Boolean
47+
) {
48+
// verify we have write permissions on the target schema by creating a table with a
49+
// random name,
50+
// then dropping that table
51+
try {
52+
// Get metadata from the database to see whether connection is possible
53+
database.bufferedResultSetQuery(
54+
{ conn: Connection -> conn.metaData.catalogs },
55+
{ queryContext: ResultSet? ->
56+
JdbcUtils.defaultSourceOperations.rowToJson(queryContext!!)
57+
},
58+
)
59+
60+
// verify we have write permissions on the target schema by creating a table with a
61+
// random name,
62+
// then dropping that table
63+
val outputTableName =
64+
namingResolver.getIdentifier(
65+
"_airbyte_connection_test_" +
66+
UUID.randomUUID().toString().replace("-".toRegex(), ""),
67+
)
68+
sqlOps.createSchemaIfNotExists(database, outputSchema)
69+
sqlOps.createTableIfNotExists(database, outputSchema, outputTableName)
70+
// verify if user has permission to make SQL INSERT queries
71+
try {
72+
if (attemptInsert) {
73+
sqlOps.insertRecords(
74+
database,
75+
listOf(dummyRecord),
76+
outputSchema,
77+
outputTableName,
78+
)
79+
}
80+
} finally {
81+
sqlOps.dropTableIfExists(database, outputSchema, outputTableName)
82+
}
83+
} catch (e: SQLException) {
84+
if (Objects.isNull(e.cause) || e.cause !is SQLException) {
85+
throw ConnectionErrorException(e.sqlState, e.errorCode, e.message, e)
86+
} else {
87+
val cause = e.cause as SQLException?
88+
throw ConnectionErrorException(e.sqlState, cause!!.errorCode, cause.message, e)
89+
}
90+
} catch (e: Exception) {
91+
throw Exception(e)
92+
}
93+
}
94+
95+
private val dummyRecord: PartialAirbyteMessage
96+
/**
97+
* Generates a dummy AirbyteRecordMessage with random values.
98+
*
99+
* @return AirbyteRecordMessage object with dummy values that may be used to test insert
100+
* permission.
101+
*/
102+
get() {
103+
val dummyDataToInsert = Jsons.deserialize("{ \"field1\": true }")
104+
return PartialAirbyteMessage()
105+
.withRecord(
106+
PartialAirbyteRecordMessage()
107+
.withStream("stream1")
108+
.withEmittedAt(1602637589000L),
109+
)
110+
.withSerialized(dummyDataToInsert.toString())
111+
}
112+
}

airbyte-cdk/java/airbyte-cdk/db-destinations/src/main/kotlin/io/airbyte/cdk/integrations/destination/jdbc/copy/CopyConsumerFactory.kt

+1
Original file line numberDiff line numberDiff line change
@@ -20,6 +20,7 @@ import javax.sql.DataSource
2020
import org.slf4j.Logger
2121
import org.slf4j.LoggerFactory
2222

23+
// TODO: Delete this class, this is only used in StarburstGalaxyDestination
2324
object CopyConsumerFactory {
2425
private val LOGGER: Logger = LoggerFactory.getLogger(CopyConsumerFactory::class.java)
2526

airbyte-cdk/java/airbyte-cdk/db-destinations/src/main/kotlin/io/airbyte/cdk/integrations/destination/jdbc/copy/CopyDestination.kt

+1
Original file line numberDiff line numberDiff line change
@@ -20,6 +20,7 @@ import javax.sql.DataSource
2020
import org.slf4j.Logger
2121
import org.slf4j.LoggerFactory
2222

23+
// TODO: Delete this class, this is only used in StarburstGalaxyDestination
2324
abstract class CopyDestination : BaseConnector, Destination {
2425
/**
2526
* The default database schema field in the destination config is "schema". To change it, pass

0 commit comments

Comments
 (0)