Skip to content

Commit a4d667f

Browse files
destination-snowflake: bump CDK
1 parent 6d74db7 commit a4d667f

File tree

9 files changed

+120
-74
lines changed

9 files changed

+120
-74
lines changed

airbyte-integrations/connectors/destination-snowflake/build.gradle

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -3,7 +3,7 @@ plugins {
33
}
44

55
airbyteJavaConnector {
6-
cdkVersionRequired = '0.44.19'
6+
cdkVersionRequired = '0.45.0'
77
features = ['db-destinations', 's3-destinations', 'typing-deduping']
88
useLocalCdk = false
99
}
Lines changed: 2 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -1 +1,2 @@
1-
JunitMethodExecutionTimeout=30 m
1+
JunitMethodExecutionTimeout=20 m
2+
testExecutionConcurrency=2

airbyte-integrations/connectors/destination-snowflake/metadata.yaml

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -5,7 +5,7 @@ data:
55
connectorSubtype: database
66
connectorType: destination
77
definitionId: 424892c4-daac-4491-b35d-c6688ba547ba
8-
dockerImageTag: 3.11.11
8+
dockerImageTag: 3.11.12
99
dockerRepository: airbyte/destination-snowflake
1010
documentationUrl: https://docs.airbyte.com/integrations/destinations/snowflake
1111
githubIssueLabel: destination-snowflake

airbyte-integrations/connectors/destination-snowflake/src/main/kotlin/io/airbyte/integrations/destination/snowflake/SnowflakeDatabaseUtils.kt

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -57,7 +57,7 @@ object SnowflakeDatabaseUtils {
5757
private const val IP_NOT_IN_WHITE_LIST_ERR_MSG = "not allowed to access Snowflake"
5858

5959
@JvmStatic
60-
fun createDataSource(config: JsonNode, airbyteEnvironment: String?): HikariDataSource {
60+
fun createDataSource(config: JsonNode, airbyteEnvironment: String?): DataSource {
6161

6262
val dataSource = HikariDataSource()
6363

airbyte-integrations/connectors/destination-snowflake/src/main/kotlin/io/airbyte/integrations/destination/snowflake/SnowflakeDestination.kt

Lines changed: 11 additions & 13 deletions
Original file line numberDiff line numberDiff line change
@@ -316,19 +316,17 @@ constructor(
316316
}
317317

318318
fun main(args: Array<String>) {
319-
IntegrationRunner.addOrphanedThreadFilter { t: Thread ->
320-
if (IntegrationRunner.getThreadCreationInfo(t) != null) {
321-
for (stackTraceElement in IntegrationRunner.getThreadCreationInfo(t)!!.stack) {
322-
val stackClassName = stackTraceElement.className
323-
val stackMethodName = stackTraceElement.methodName
324-
if (
325-
SFStatement::class.java.canonicalName == stackClassName &&
326-
"close" == stackMethodName ||
327-
SFSession::class.java.canonicalName == stackClassName &&
328-
"callHeartBeatWithQueryTimeout" == stackMethodName
329-
) {
330-
return@addOrphanedThreadFilter false
331-
}
319+
IntegrationRunner.addOrphanedThreadFilter { threadInfo: IntegrationRunner.OrphanedThreadInfo ->
320+
for (stackTraceElement in threadInfo.threadCreationInfo.stack) {
321+
val stackClassName = stackTraceElement.className
322+
val stackMethodName = stackTraceElement.methodName
323+
if (
324+
SFStatement::class.java.canonicalName == stackClassName &&
325+
"close" == stackMethodName ||
326+
SFSession::class.java.canonicalName == stackClassName &&
327+
"callHeartBeatWithQueryTimeout" == stackMethodName
328+
) {
329+
return@addOrphanedThreadFilter false
332330
}
333331
}
334332
true

airbyte-integrations/connectors/destination-snowflake/src/test-integration/kotlin/io/airbyte/integrations/destination/snowflake/SnowflakeDestinationIntegrationTest.kt

Lines changed: 9 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -33,15 +33,20 @@ internal class SnowflakeDestinationIntegrationTest {
3333
@Throws(Exception::class)
3434
fun testCheckFailsWithInvalidPermissions() {
3535
// TODO(sherifnada) this test case is assumes config.json does not have permission to access
36-
// the
37-
// schema
36+
// the schema RESTRICTED_SCHEMA was created by the user AIRBYTETESTER, and then permissions
37+
// were removed with
38+
// 'REVOKE ALL ON SCHEMA restricted_schema FROM ROLE integration_tester_destination;'
3839
// this connector should be updated with multiple credentials, each with a clear purpose
39-
// (valid,
40-
// invalid: insufficient permissions, invalid: wrong password, etc..)
40+
// (valid, invalid: insufficient permissions, invalid: wrong password, etc..)
4141
val credentialsJsonString = deserialize(Files.readString(Paths.get("secrets/config.json")))
4242
val check =
4343
SnowflakeDestination(OssCloudEnvVarConsts.AIRBYTE_OSS).check(credentialsJsonString)
4444
Assertions.assertEquals(AirbyteConnectionStatus.Status.FAILED, check!!.status)
45+
Assertions.assertEquals(
46+
"Could not connect with provided configuration. Encountered Error with Snowflake Configuration: " +
47+
"Current role does not have permissions on the target schema please verify your privileges",
48+
check.message
49+
)
4550
}
4651

4752
@Test

airbyte-integrations/connectors/destination-snowflake/src/test-integration/kotlin/io/airbyte/integrations/destination/snowflake/typing_deduping/AbstractSnowflakeSqlGeneratorIntegrationTest.kt

Lines changed: 0 additions & 16 deletions
Original file line numberDiff line numberDiff line change
@@ -6,7 +6,6 @@ package io.airbyte.integrations.destination.snowflake.typing_deduping
66
import com.fasterxml.jackson.databind.JsonNode
77
import com.google.common.collect.ImmutableMap
88
import edu.umd.cs.findbugs.annotations.SuppressFBWarnings
9-
import io.airbyte.cdk.db.factory.DataSourceFactory
109
import io.airbyte.cdk.db.jdbc.JdbcDatabase
1110
import io.airbyte.cdk.db.jdbc.JdbcUtils
1211
import io.airbyte.cdk.integrations.base.JavaBaseConstants
@@ -1857,20 +1856,5 @@ abstract class AbstractSnowflakeSqlGeneratorIntegrationTest :
18571856
private var dataSource: DataSource =
18581857
SnowflakeDatabaseUtils.createDataSource(config, OssCloudEnvVarConsts.AIRBYTE_OSS)
18591858
private var database: JdbcDatabase = SnowflakeDatabaseUtils.getDatabase(dataSource)
1860-
1861-
@JvmStatic
1862-
@BeforeAll
1863-
fun setupSnowflake(): Unit {
1864-
dataSource =
1865-
SnowflakeDatabaseUtils.createDataSource(config, OssCloudEnvVarConsts.AIRBYTE_OSS)
1866-
database = SnowflakeDatabaseUtils.getDatabase(dataSource)
1867-
}
1868-
1869-
@JvmStatic
1870-
@AfterAll
1871-
@Throws(Exception::class)
1872-
fun teardownSnowflake(): Unit {
1873-
DataSourceFactory.close(dataSource)
1874-
}
18751859
}
18761860
}

airbyte-integrations/connectors/destination-snowflake/src/test-integration/kotlin/io/airbyte/integrations/destination/snowflake/typing_deduping/AbstractSnowflakeTypingDedupingTest.kt

Lines changed: 94 additions & 37 deletions
Original file line numberDiff line numberDiff line change
@@ -21,9 +21,13 @@ import io.airbyte.workers.exception.TestHarnessException
2121
import io.github.oshai.kotlinlogging.KotlinLogging
2222
import java.nio.file.Path
2323
import java.sql.SQLException
24+
import java.time.Instant
25+
import java.time.temporal.ChronoUnit
2426
import java.util.*
27+
import java.util.concurrent.Executors
28+
import java.util.concurrent.atomic.AtomicBoolean
29+
import java.util.concurrent.atomic.AtomicInteger
2530
import javax.sql.DataSource
26-
import kotlin.concurrent.Volatile
2731
import org.junit.jupiter.api.Assertions
2832
import org.junit.jupiter.api.Assertions.assertEquals
2933
import org.junit.jupiter.api.Disabled
@@ -56,7 +60,7 @@ abstract class AbstractSnowflakeTypingDedupingTest(
5660
dataSource =
5761
SnowflakeDatabaseUtils.createDataSource(config, OssCloudEnvVarConsts.AIRBYTE_OSS)
5862
database = SnowflakeDatabaseUtils.getDatabase(dataSource)
59-
cleanAirbyteInternalTable(databaseName, database, forceUppercaseIdentifiers)
63+
cleanAirbyteInternalTable(database)
6064
return config
6165
}
6266

@@ -419,48 +423,101 @@ abstract class AbstractSnowflakeTypingDedupingTest(
419423
"_AIRBYTE_GENERATION_ID",
420424
)
421425

422-
@Volatile private var cleanedAirbyteInternalTable = false
426+
private val cleanedAirbyteInternalTable = AtomicBoolean(false)
427+
private val threadId = AtomicInteger(0)
423428

424429
@Throws(SQLException::class)
425-
private fun cleanAirbyteInternalTable(
426-
databaseName: String,
427-
database: JdbcDatabase?,
428-
forceUppercase: Boolean,
429-
) {
430-
if (!cleanedAirbyteInternalTable) {
431-
synchronized(AbstractSnowflakeTypingDedupingTest::class.java) {
432-
if (!cleanedAirbyteInternalTable) {
433-
val destinationStateTableExists =
434-
database!!.executeMetadataQuery {
435-
it.getTables(
436-
databaseName,
437-
if (forceUppercase) {
438-
"AIRBYTE_INTERNAL"
439-
} else {
440-
"airbyte_internal"
441-
},
442-
if (forceUppercase) {
443-
"_AIRBYTE_DESTINATION_STATE"
444-
} else {
445-
"_airbyte_destination_state"
446-
},
447-
null
448-
)
449-
.next()
450-
}
451-
if (destinationStateTableExists) {
452-
database.execute(
453-
"""DELETE FROM "airbyte_internal"."_airbyte_destination_state" WHERE "updated_at" < current_date() - 7""",
430+
private fun cleanAirbyteInternalTable(database: JdbcDatabase?) {
431+
if (
432+
database!!
433+
.queryJsons("SHOW PARAMETERS LIKE 'QUOTED_IDENTIFIERS_IGNORE_CASE';")
434+
.first()
435+
.get("value")
436+
.asText()
437+
.toBoolean()
438+
) {
439+
return
440+
}
441+
442+
if (!cleanedAirbyteInternalTable.getAndSet(true)) {
443+
val cleanupCutoffHours = 6
444+
LOGGER.info { "tableCleaner running" }
445+
val executor =
446+
Executors.newSingleThreadExecutor {
447+
val thread = Executors.defaultThreadFactory().newThread(it)
448+
thread.name =
449+
"airbyteInternalTableCleanupThread-${threadId.incrementAndGet()}"
450+
thread.isDaemon = true
451+
thread
452+
}
453+
executor.execute {
454+
database.execute(
455+
"DELETE FROM \"airbyte_internal\".\"_airbyte_destination_state\" WHERE \"updated_at\" < timestampadd('hours', -$cleanupCutoffHours, current_timestamp())",
456+
)
457+
}
458+
executor.execute {
459+
database.execute(
460+
"DELETE FROM \"AIRBYTE_INTERNAL\".\"_AIRBYTE_DESTINATION_STATE\" WHERE \"UPDATED_AT\" < timestampadd('hours', -$cleanupCutoffHours, current_timestamp())",
461+
)
462+
}
463+
executor.execute {
464+
val schemaList =
465+
database.queryJsons(
466+
"SHOW SCHEMAS IN DATABASE INTEGRATION_TEST_DESTINATION;",
467+
)
468+
LOGGER.info(
469+
"tableCleaner found ${schemaList.size} schemas in database INTEGRATION_TEST_DESTINATION"
470+
)
471+
schemaList
472+
.associate {
473+
it.get("name").asText() to Instant.parse(it.get("created_on").asText())
474+
}
475+
.filter {
476+
it.value.isBefore(
477+
Instant.now().minus(cleanupCutoffHours.toLong(), ChronoUnit.HOURS)
454478
)
455479
}
456-
cleanedAirbyteInternalTable = true
480+
.filter {
481+
it.key.startsWith("SQL_GENERATOR", ignoreCase = true) ||
482+
it.key.startsWith("TDTEST", ignoreCase = true) ||
483+
it.key.startsWith("TYPING_DEDUPING", ignoreCase = true)
484+
}
485+
.forEach {
486+
executor.execute {
487+
database.execute(
488+
"DROP SCHEMA INTEGRATION_TEST_DESTINATION.\"${it.key}\" /* created at ${it.value} */;"
489+
)
490+
}
491+
}
492+
}
493+
for (schemaName in
494+
listOf("AIRBYTE_INTERNAL", "airbyte_internal", "overridden_raw_dataset")) {
495+
executor.execute {
496+
val sql =
497+
"SHOW TABLES IN schema INTEGRATION_TEST_DESTINATION.\"$schemaName\";"
498+
val tableList = database.queryJsons(sql)
499+
LOGGER.info {
500+
"tableCleaner found ${tableList.size} tables in schema $schemaName"
501+
}
502+
tableList
503+
.associate {
504+
it.get("name").asText() to
505+
Instant.parse(it.get("created_on").asText())
506+
}
507+
.filter {
508+
it.value.isBefore(Instant.now().minus(6, ChronoUnit.HOURS)) &&
509+
it.key.startsWith("TDTEST", ignoreCase = true)
510+
}
511+
.forEach {
512+
executor.execute {
513+
database.execute(
514+
"DROP TABLE INTEGRATION_TEST_DESTINATION.\"$schemaName\".\"${it.key}\" /* created at ${it.value} */;"
515+
)
516+
}
517+
}
457518
}
458519
}
459520
}
460521
}
461522
}
462523
}
463-
464-
open class Batch(val name: String)
465-
466-
class LocalFileBatch(name: String) : Batch(name)

docs/integrations/destinations/snowflake.md

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -268,6 +268,7 @@ desired namespace.
268268

269269
| Version | Date | Pull Request | Subject |
270270
| :-------------- | :--------- | :--------------------------------------------------------- | :------------------------------------------------------------------------------------------------------------------------------------------------------------------------------- |
271+
| 3.11.12 | 2024-09-12 | [45370](https://github.com/airbytehq/airbyte/pull/45370) | fix a race condition in our orphanedThreadFilter |
271272
| 3.11.11 | 2024-08-20 | [44476](https://github.com/airbytehq/airbyte/pull/44476) | Increase message parsing limit to 100mb |
272273
| 3.11.10 | 2024-08-22 | [\#44526](https://github.com/airbytehq/airbyte/pull/44526) | Revert protocol compliance fix |
273274
| 3.11.9 | 2024-08-19 | [\#43367](https://github.com/airbytehq/airbyte/pull/43367) | Add opt in using MERGE statement for upserts and deletes |

0 commit comments

Comments
 (0)