|
5 | 5 | package io.airbyte.integrations.destination.databricks.typededupe
|
6 | 6 |
|
7 | 7 | import com.fasterxml.jackson.databind.JsonNode
|
| 8 | +import com.fasterxml.jackson.databind.node.ObjectNode |
8 | 9 | import io.airbyte.cdk.db.jdbc.DefaultJdbcDatabase
|
9 | 10 | import io.airbyte.cdk.db.jdbc.JdbcDatabase
|
10 | 11 | import io.airbyte.cdk.db.jdbc.JdbcSourceOperations
|
11 | 12 | import io.airbyte.cdk.integrations.base.JavaBaseConstants
|
| 13 | +import io.airbyte.commons.io.IOs |
| 14 | +import io.airbyte.commons.json.Jsons |
12 | 15 | import io.airbyte.integrations.base.destination.typing_deduping.BaseTypingDedupingTest
|
13 | 16 | import io.airbyte.integrations.base.destination.typing_deduping.SqlGenerator
|
14 | 17 | import io.airbyte.integrations.base.destination.typing_deduping.StreamId
|
15 | 18 | import io.airbyte.integrations.destination.databricks.DatabricksConnectorClientsFactory
|
16 |
| -import io.airbyte.integrations.destination.databricks.DatabricksIntegrationTestUtils |
17 | 19 | import io.airbyte.integrations.destination.databricks.jdbc.DatabricksNamingTransformer
|
18 | 20 | import io.airbyte.integrations.destination.databricks.jdbc.DatabricksSqlGenerator
|
19 | 21 | import io.airbyte.integrations.destination.databricks.model.DatabricksConnectorConfig
|
| 22 | +import java.nio.file.Path |
20 | 23 | import java.sql.Connection
|
21 | 24 | import java.sql.ResultSet
|
22 |
| -import java.util.concurrent.TimeUnit |
23 |
| -import org.junit.jupiter.api.BeforeAll |
24 |
| -import org.junit.jupiter.api.Timeout |
25 |
| -import org.mockito.Mockito |
| 25 | +import java.util.Locale |
| 26 | +import org.apache.commons.lang3.RandomStringUtils |
26 | 27 |
|
27 |
| -class DatabricksTypingDedupingTest : BaseTypingDedupingTest() { |
| 28 | +abstract class AbstractDatabricksTypingDedupingTest( |
| 29 | + private val jdbcDatabase: JdbcDatabase, |
| 30 | + private val jsonConfig: JsonNode, |
| 31 | + private val connectorConfig: DatabricksConnectorConfig, |
| 32 | +) : BaseTypingDedupingTest() { |
28 | 33 | override val imageName: String
|
29 | 34 | get() = "airbyte/destination-databricks:dev"
|
30 | 35 |
|
31 | 36 | companion object {
|
32 |
| - private var jdbcDatabase: JdbcDatabase = Mockito.mock() |
33 |
| - private var connectorConfig: DatabricksConnectorConfig = Mockito.mock() |
34 |
| - @JvmStatic |
35 |
| - @BeforeAll |
36 |
| - @Timeout(value = 10, unit = TimeUnit.MINUTES) |
37 |
| - fun setupDatabase() { |
38 |
| - connectorConfig = DatabricksIntegrationTestUtils.oauthConfig |
39 |
| - jdbcDatabase = |
| 37 | + fun setupDatabase( |
| 38 | + connectorConfigPath: String |
| 39 | + ): Triple<JdbcDatabase, JsonNode, DatabricksConnectorConfig> { |
| 40 | + var jsonConfig = Jsons.deserialize(IOs.readFile(Path.of(connectorConfigPath))) |
| 41 | + |
| 42 | + // Randomize the default namespace to avoid collisions between |
| 43 | + // concurrent test runs. |
| 44 | + // Technically, we should probably do this in `generateConfig`, |
| 45 | + // because there could be concurrent test runs within a single class, |
| 46 | + // but we currently only have a single test that uses the default |
| 47 | + // namespace anyway. |
| 48 | + val uniqueSuffix = RandomStringUtils.randomAlphabetic(10).lowercase(Locale.getDefault()) |
| 49 | + val defaultSchema = "typing_deduping_default_schema_$uniqueSuffix" |
| 50 | + val connectorConfig = |
| 51 | + DatabricksConnectorConfig.deserialize(jsonConfig).copy(schema = defaultSchema) |
| 52 | + (jsonConfig as ObjectNode).put("schema", defaultSchema) |
| 53 | + |
| 54 | + val jdbcDatabase = |
40 | 55 | DefaultJdbcDatabase(
|
41 | 56 | DatabricksConnectorClientsFactory.createDataSource(connectorConfig)
|
42 | 57 | )
|
43 | 58 | // This will trigger warehouse start
|
44 | 59 | jdbcDatabase.execute("SELECT 1")
|
| 60 | + |
| 61 | + return Triple(jdbcDatabase, jsonConfig, connectorConfig) |
45 | 62 | }
|
46 | 63 | }
|
47 | 64 |
|
48 | 65 | override fun generateConfig(): JsonNode {
|
49 | 66 | // This method is called in BeforeEach so setup any other references needed per test
|
50 |
| - return DatabricksIntegrationTestUtils.oauthConfigJson.deepCopy() |
| 67 | + return jsonConfig.deepCopy() |
51 | 68 | }
|
52 | 69 |
|
53 | 70 | private fun rawTableIdentifier(
|
@@ -80,7 +97,7 @@ class DatabricksTypingDedupingTest : BaseTypingDedupingTest() {
|
80 | 97 | .executeQuery(
|
81 | 98 | """
|
82 | 99 | SELECT *
|
83 |
| - FROM `${connectorConfig.database}`.$tableIdentifier |
| 100 | + FROM `${connectorConfig.database}`.$tableIdentifier |
84 | 101 | ORDER BY ${JavaBaseConstants.COLUMN_NAME_AB_EXTRACTED_AT} ASC
|
85 | 102 | """.trimIndent(),
|
86 | 103 | )
|
|
0 commit comments