Skip to content

Commit bb1833c

Browse files
authored
Destination Databricks: Create namespace if missing in CHECK (#45208)
1 parent 58989b0 commit bb1833c

File tree

3 files changed

+141
-112
lines changed

3 files changed

+141
-112
lines changed

airbyte-integrations/connectors/destination-databricks/metadata.yaml

+1-1
Original file line numberDiff line numberDiff line change
@@ -2,7 +2,7 @@ data:
22
connectorSubtype: database
33
connectorType: destination
44
definitionId: 072d5540-f236-4294-ba7c-ade8fd918496
5-
dockerImageTag: 3.2.3
5+
dockerImageTag: 3.2.4
66
dockerRepository: airbyte/destination-databricks
77
githubIssueLabel: destination-databricks
88
icon: databricks.svg

airbyte-integrations/connectors/destination-databricks/src/main/kotlin/io/airbyte/integrations/destination/databricks/DatabricksDestination.kt

+138-110
Original file line numberDiff line numberDiff line change
@@ -11,26 +11,33 @@ import io.airbyte.cdk.integrations.base.AirbyteMessageConsumer
1111
import io.airbyte.cdk.integrations.base.Destination
1212
import io.airbyte.cdk.integrations.base.IntegrationRunner
1313
import io.airbyte.cdk.integrations.base.SerializedAirbyteMessageConsumer
14+
import io.airbyte.cdk.integrations.destination.StreamSyncSummary
1415
import io.airbyte.cdk.integrations.destination.async.AsyncStreamConsumer
1516
import io.airbyte.cdk.integrations.destination.async.buffers.BufferManager
1617
import io.airbyte.cdk.integrations.destination.async.deser.AirbyteMessageDeserializer
18+
import io.airbyte.cdk.integrations.destination.async.model.PartialAirbyteMessage
19+
import io.airbyte.cdk.integrations.destination.async.model.PartialAirbyteRecordMessage
1720
import io.airbyte.cdk.integrations.destination.s3.FileUploadFormat
1821
import io.airbyte.integrations.base.destination.operation.DefaultFlush
1922
import io.airbyte.integrations.base.destination.operation.DefaultSyncOperation
2023
import io.airbyte.integrations.base.destination.typing_deduping.CatalogParser
24+
import io.airbyte.integrations.base.destination.typing_deduping.DestinationInitialStatus
2125
import io.airbyte.integrations.base.destination.typing_deduping.ImportType
26+
import io.airbyte.integrations.base.destination.typing_deduping.InitialRawTableStatus
2227
import io.airbyte.integrations.base.destination.typing_deduping.Sql
2328
import io.airbyte.integrations.base.destination.typing_deduping.StreamConfig
24-
import io.airbyte.integrations.base.destination.typing_deduping.StreamId
29+
import io.airbyte.integrations.base.destination.typing_deduping.migrators.MinimumDestinationState
2530
import io.airbyte.integrations.destination.databricks.jdbc.DatabricksDestinationHandler
2631
import io.airbyte.integrations.destination.databricks.jdbc.DatabricksNamingTransformer
2732
import io.airbyte.integrations.destination.databricks.jdbc.DatabricksSqlGenerator
2833
import io.airbyte.integrations.destination.databricks.model.DatabricksConnectorConfig
2934
import io.airbyte.integrations.destination.databricks.operation.DatabricksStorageOperation
35+
import io.airbyte.integrations.destination.databricks.operation.DatabricksStreamOperation
3036
import io.airbyte.integrations.destination.databricks.operation.DatabricksStreamOperationFactory
31-
import io.airbyte.integrations.destination.databricks.staging.DatabricksFileBufferFactory
3237
import io.airbyte.protocol.models.v0.AirbyteConnectionStatus
3338
import io.airbyte.protocol.models.v0.AirbyteMessage
39+
import io.airbyte.protocol.models.v0.AirbyteRecordMessageMeta
40+
import io.airbyte.protocol.models.v0.AirbyteStreamStatusTraceMessage
3441
import io.airbyte.protocol.models.v0.ConfiguredAirbyteCatalog
3542
import io.github.oshai.kotlinlogging.KotlinLogging
3643
import java.util.*
@@ -50,124 +57,146 @@ class DatabricksDestination : BaseConnector(), Destination {
5057
}
5158

5259
override fun check(config: JsonNode): AirbyteConnectionStatus? {
53-
// TODO: Add proper checks for
54-
// Check schema permissions, or if raw_override and default already exists
55-
// Check catalog permissions to USE catalog
56-
// Check CREATE volume, COPY INTO, File upload permissions
57-
// Check Table creation, Table drop permissions
58-
val connectorConfig = DatabricksConnectorConfig.deserialize(config)
59-
val sqlGenerator =
60-
DatabricksSqlGenerator(DatabricksNamingTransformer(), connectorConfig.database)
61-
val datasource = DatabricksConnectorClientsFactory.createDataSource(connectorConfig)
62-
val jdbcDatabase = DefaultJdbcDatabase(datasource)
63-
val destinationHandler =
64-
DatabricksDestinationHandler(sqlGenerator, connectorConfig.database, jdbcDatabase)
65-
val workspaceClient =
66-
DatabricksConnectorClientsFactory.createWorkspaceClient(
67-
connectorConfig.hostname,
68-
connectorConfig.authentication
69-
)
70-
val storageOperations =
71-
DatabricksStorageOperation(
72-
sqlGenerator,
73-
destinationHandler,
74-
workspaceClient,
75-
connectorConfig.database,
76-
connectorConfig.purgeStagingData
77-
)
78-
val dummyNamespace = connectorConfig.rawSchemaOverride
79-
val dummyName = "airbyte_check_test_table"
80-
val streamId =
81-
StreamId(
82-
dummyNamespace,
83-
dummyName,
84-
dummyNamespace,
85-
dummyName,
86-
dummyNamespace,
87-
dummyName
88-
)
89-
val streamConfig =
90-
StreamConfig(
91-
id = streamId,
92-
postImportAction = ImportType.APPEND,
93-
primaryKey = listOf(),
94-
cursor = Optional.empty(),
95-
columns = linkedMapOf(),
96-
generationId = 1,
97-
minimumGenerationId = 1,
98-
syncId = 0
99-
)
100-
101-
// quick utility method to drop the airbyte_check_test_table table
102-
// returns a connection status if there was an error, or null on success
103-
fun dropCheckTable(): AirbyteConnectionStatus? {
104-
val dropCheckTableStatement =
105-
"DROP TABLE IF EXISTS `${connectorConfig.database}`.`${streamId.rawNamespace}`.`${streamId.rawName}`;"
106-
try {
107-
destinationHandler.execute(
108-
Sql.of(
109-
dropCheckTableStatement,
110-
),
60+
try {
61+
val connectorConfig = DatabricksConnectorConfig.deserialize(config)
62+
val datasource = DatabricksConnectorClientsFactory.createDataSource(connectorConfig)
63+
val sqlGenerator =
64+
DatabricksSqlGenerator(DatabricksNamingTransformer(), connectorConfig.database)
65+
val jdbcDatabase = DefaultJdbcDatabase(datasource)
66+
val destinationHandler =
67+
DatabricksDestinationHandler(sqlGenerator, connectorConfig.database, jdbcDatabase)
68+
val workspaceClient =
69+
DatabricksConnectorClientsFactory.createWorkspaceClient(
70+
connectorConfig.hostname,
71+
connectorConfig.authentication
11172
)
112-
} catch (e: Exception) {
113-
log.error(e) { "Failed to execute query $dropCheckTableStatement" }
114-
return AirbyteConnectionStatus()
115-
.withStatus(AirbyteConnectionStatus.Status.FAILED)
116-
.withMessage("Failed to execute $dropCheckTableStatement: ${e.message}")
117-
}
118-
return null
119-
}
73+
val storageOperation =
74+
DatabricksStorageOperation(
75+
sqlGenerator,
76+
destinationHandler,
77+
workspaceClient,
78+
connectorConfig.database,
79+
connectorConfig.purgeStagingData
80+
)
81+
val rawTableNamespace = connectorConfig.rawSchemaOverride
82+
val finalTableName = "airbyte_check_test_table"
12083

121-
// Before we start, clean up any preexisting check table from a previous attempt.
122-
dropCheckTable()?.let {
123-
return it
124-
}
84+
// Both raw & final Namespaces are same for dummy sync since we don't do any final table
85+
// operations
86+
// in check
87+
val streamId =
88+
sqlGenerator.buildStreamId(rawTableNamespace, finalTableName, rawTableNamespace)
89+
val streamConfig =
90+
StreamConfig(
91+
id = streamId,
92+
postImportAction = ImportType.APPEND,
93+
primaryKey = listOf(),
94+
cursor = Optional.empty(),
95+
columns = linkedMapOf(),
96+
generationId = 1,
97+
minimumGenerationId = 1,
98+
syncId = 0
99+
)
125100

126-
try {
127-
storageOperations.prepareStage(streamId, suffix = "")
128-
} catch (e: Exception) {
129-
log.error(e) { "Failed to prepare stage as part of CHECK" }
130-
return AirbyteConnectionStatus()
131-
.withStatus(AirbyteConnectionStatus.Status.FAILED)
132-
.withMessage("Failed to prepare stage")
133-
}
101+
// quick utility method to drop the airbyte_check_test_table table
102+
// returns a connection status if there was an error, or null on success
103+
fun dropCheckTable(): AirbyteConnectionStatus? {
104+
val dropCheckTableStatement =
105+
"DROP TABLE IF EXISTS `${connectorConfig.database}`.`${streamId.rawNamespace}`.`${streamId.rawName}`;"
106+
try {
107+
destinationHandler.execute(
108+
Sql.of(
109+
dropCheckTableStatement,
110+
),
111+
)
112+
} catch (e: Exception) {
113+
log.error(e) { "Failed to execute query $dropCheckTableStatement" }
114+
return AirbyteConnectionStatus()
115+
.withStatus(AirbyteConnectionStatus.Status.FAILED)
116+
.withMessage("Failed to execute $dropCheckTableStatement: ${e.message}")
117+
}
118+
return null
119+
}
134120

135-
try {
136-
val writeBuffer = DatabricksFileBufferFactory.createBuffer(FileUploadFormat.CSV)
137-
writeBuffer.use {
138-
it.accept(
139-
"{\"airbyte_check\":\"passed\"}",
140-
"{}",
141-
generationId = 0,
142-
System.currentTimeMillis()
121+
// None of the fields in destination initial status matter
122+
// for a dummy sync with type-dedupe disabled. We only look at these
123+
// when we perform final table related setup operations.
124+
// We just need the streamId to perform the calls in streamOperation.
125+
val initialStatus =
126+
DestinationInitialStatus(
127+
streamConfig = streamConfig,
128+
isFinalTablePresent = false,
129+
initialRawTableStatus =
130+
InitialRawTableStatus(
131+
rawTableExists = false,
132+
hasUnprocessedRecords = true,
133+
maxProcessedTimestamp = Optional.empty()
134+
),
135+
initialTempRawTableStatus =
136+
InitialRawTableStatus(
137+
rawTableExists = false,
138+
hasUnprocessedRecords = true,
139+
maxProcessedTimestamp = Optional.empty()
140+
),
141+
isSchemaMismatch = true,
142+
isFinalTableEmpty = true,
143+
destinationState = MinimumDestinationState.Impl(needsSoftReset = false),
144+
finalTableGenerationId = null,
145+
finalTempTableGenerationId = null,
143146
)
144-
it.flush()
145-
storageOperations.writeToStage(streamConfig, suffix = "", writeBuffer)
147+
148+
// We simulate a mini-sync to see the raw table code path is exercised. and disable T+D
149+
// This code is similar to Snowflake's Check
150+
destinationHandler.createNamespaces(setOf(rawTableNamespace))
151+
// Before we start, clean up any preexisting check table from a previous attempt.
152+
// Even though we clean up at the end. This exists because some version of the old
153+
// connector
154+
// didn't clean up properly and to let them pass the check we do it both before and
155+
// after.
156+
dropCheckTable()?.let {
157+
return it
146158
}
147-
} catch (e: Exception) {
148-
log.error(e) { "Failed to write to stage as part of CHECK" }
149-
return AirbyteConnectionStatus()
150-
.withStatus(AirbyteConnectionStatus.Status.FAILED)
151-
.withMessage("Failed to write to stage")
152-
}
159+
val streamOperation =
160+
DatabricksStreamOperation(
161+
storageOperation,
162+
initialStatus,
163+
FileUploadFormat.CSV,
164+
disableTypeDedupe = true
165+
)
153166

154-
try {
155-
storageOperations.cleanupStage(streamId)
167+
val data =
168+
"""
169+
{"airbyte_check": "passed"}
170+
""".trimIndent()
171+
val message =
172+
PartialAirbyteMessage()
173+
.withSerialized(data)
174+
.withRecord(
175+
PartialAirbyteRecordMessage()
176+
.withEmittedAt(System.currentTimeMillis())
177+
.withMeta(
178+
AirbyteRecordMessageMeta(),
179+
),
180+
)
181+
182+
streamOperation.writeRecords(streamConfig, listOf(message).stream())
183+
streamOperation.finalizeTable(
184+
streamConfig,
185+
StreamSyncSummary(1, AirbyteStreamStatusTraceMessage.AirbyteStreamStatus.COMPLETE)
186+
)
187+
// Clean up after ourselves.
188+
// Not _strictly_ necessary since we do this at the start of `check`,
189+
// but it's slightly nicer.
190+
dropCheckTable()?.let {
191+
return it
192+
}
193+
return AirbyteConnectionStatus().withStatus(AirbyteConnectionStatus.Status.SUCCEEDED)
156194
} catch (e: Exception) {
157-
log.error(e) { "Failed to cleanup stage" }
195+
log.error(e) { "Failed to execute check" }
158196
return AirbyteConnectionStatus()
159197
.withStatus(AirbyteConnectionStatus.Status.FAILED)
160-
.withMessage("Failed to cleanup stage")
198+
.withMessage("${e.message}")
161199
}
162-
163-
// Clean up after ourselves.
164-
// Not _strictly_ necessary since we do this at the start of `check`,
165-
// but it's slightly nicer.
166-
dropCheckTable()?.let {
167-
return it
168-
}
169-
170-
return AirbyteConnectionStatus().withStatus(AirbyteConnectionStatus.Status.SUCCEEDED)
171200
}
172201

173202
override fun getSerializedMessageConsumer(
@@ -176,7 +205,6 @@ class DatabricksDestination : BaseConnector(), Destination {
176205
outputRecordCollector: Consumer<AirbyteMessage>
177206
): SerializedAirbyteMessageConsumer {
178207

179-
// TODO: Deserialization should be taken care by connector runner framework later
180208
val connectorConfig = DatabricksConnectorConfig.deserialize(config)
181209

182210
val sqlGenerator =

docs/integrations/destinations/databricks.md

+2-1
Original file line numberDiff line numberDiff line change
@@ -90,7 +90,8 @@ with the raw tables, and their format is subject to change without notice.
9090
<summary>Expand to review</summary>
9191

9292
| Version | Date | Pull Request | Subject |
93-
| :------ | :--------- | :------------------------------------------------------------------------------------------------------------------ | :------------------------------------------------------------------------------------------------------------------------------------------------------------------------------- |
93+
|:--------|:-----------|:--------------------------------------------------------------------------------------------------------------------|:---------------------------------------------------------------------------------------------------------------------------------------------------------------------------------|
94+
| 3.2.4 | 2024-09-09 | [#45208](https://github.com/airbytehq/airbyte/pull/45208) | Fix CHECK to create missing namespace if not exists. |
9495
| 3.2.3 | 2024-09-03 | [#45115](https://github.com/airbytehq/airbyte/pull/45115) | Clarify Unity Catalog Name option. |
9596
| 3.2.2 | 2024-08-22 | [#44941](https://github.com/airbytehq/airbyte/pull/44941) | Clarify Unity Catalog Path option. |
9697
| 3.2.1 | 2024-08-22 | [#44506](https://github.com/airbytehq/airbyte/pull/44506) | Handle uppercase/mixed-case stream name/namespaces |

0 commit comments

Comments
 (0)