Skip to content

Commit ba3bdb1

Browse files
authored
Update CDK for Raw Only Dv2 destinations (#36047)
1 parent bc27a84 commit ba3bdb1

File tree

6 files changed

+145
-50
lines changed

6 files changed

+145
-50
lines changed

airbyte-cdk/java/airbyte-cdk/core/src/main/kotlin/io/airbyte/cdk/db/jdbc/JdbcUtils.kt

+2
Original file line numberDiff line numberDiff line change
@@ -64,6 +64,7 @@ object JdbcUtils {
6464
)
6565
@JvmStatic val defaultSourceOperations: JdbcSourceOperations = JdbcSourceOperations()
6666

67+
@JvmStatic
6768
val defaultJSONFormat: JSONFormat = JSONFormat().recordFormat(JSONFormat.RecordFormat.OBJECT)
6869

6970
@JvmStatic
@@ -85,6 +86,7 @@ object JdbcUtils {
8586
}
8687
}
8788

89+
@JvmStatic
8890
@JvmOverloads
8991
fun parseJdbcParameters(
9092
jdbcPropertiesString: String,

airbyte-cdk/java/airbyte-cdk/core/src/main/kotlin/io/airbyte/cdk/integrations/base/JavaBaseConstants.kt

+9-2
Original file line numberDiff line numberDiff line change
@@ -3,6 +3,13 @@
33
*/
44
package io.airbyte.cdk.integrations.base
55

6+
import java.util.*
7+
import org.apache.commons.lang3.StringUtils
8+
9+
fun upperQuoted(column: String): String {
10+
return StringUtils.wrap(column.uppercase(Locale.getDefault()), "\"")
11+
}
12+
613
object JavaBaseConstants {
714
const val ARGS_CONFIG_KEY: String = "config"
815
const val ARGS_CATALOG_KEY: String = "catalog"
@@ -33,7 +40,7 @@ object JavaBaseConstants {
3340
COLUMN_NAME_AB_RAW_ID,
3441
COLUMN_NAME_AB_EXTRACTED_AT,
3542
COLUMN_NAME_AB_LOADED_AT,
36-
COLUMN_NAME_DATA
43+
COLUMN_NAME_DATA,
3744
)
3845
@JvmField
3946
val V2_RAW_TABLE_COLUMN_NAMES: List<String> =
@@ -42,7 +49,7 @@ object JavaBaseConstants {
4249
COLUMN_NAME_AB_EXTRACTED_AT,
4350
COLUMN_NAME_AB_LOADED_AT,
4451
COLUMN_NAME_DATA,
45-
COLUMN_NAME_AB_META
52+
COLUMN_NAME_AB_META,
4653
)
4754
@JvmField
4855
val V2_FINAL_TABLE_METADATA_COLUMNS: List<String> =
Original file line numberDiff line numberDiff line change
@@ -1 +1 @@
1-
version=0.29.8
1+
version=0.29.9

airbyte-cdk/java/airbyte-cdk/db-destinations/src/main/kotlin/io/airbyte/cdk/integrations/destination/jdbc/AbstractJdbcDestination.kt

+82-42
Original file line numberDiff line numberDiff line change
@@ -56,6 +56,16 @@ abstract class AbstractJdbcDestination<DestinationState : MinimumDestinationStat
5656
protected val configSchemaKey: String
5757
get() = "schema"
5858

59+
/**
60+
* If the destination should always disable type dedupe, override this method to return true. We
61+
* only type and dedupe if we create final tables.
62+
*
63+
* @return whether the destination should always disable type dedupe
64+
*/
65+
protected open fun shouldAlwaysDisableTypeDedupe(): Boolean {
66+
return false
67+
}
68+
5969
override fun check(config: JsonNode): AirbyteConnectionStatus? {
6070
val dataSource = getDataSource(config)
6171

@@ -67,7 +77,7 @@ abstract class AbstractJdbcDestination<DestinationState : MinimumDestinationStat
6777
val v2RawSchema =
6878
namingResolver.getIdentifier(
6979
getRawNamespaceOverride(RAW_SCHEMA_OVERRIDE)
70-
.orElse(JavaBaseConstants.DEFAULT_AIRBYTE_INTERNAL_NAMESPACE)
80+
.orElse(JavaBaseConstants.DEFAULT_AIRBYTE_INTERNAL_NAMESPACE),
7181
)
7282
attemptTableOperations(v2RawSchema, database, namingResolver, sqlOperations, false)
7383
destinationSpecificTableOperations(database)
@@ -87,7 +97,7 @@ abstract class AbstractJdbcDestination<DestinationState : MinimumDestinationStat
8797
"""
8898
Could not connect with provided configuration.
8999
${e.message}
90-
""".trimIndent()
100+
""".trimIndent(),
91101
)
92102
} finally {
93103
try {
@@ -123,7 +133,7 @@ abstract class AbstractJdbcDestination<DestinationState : MinimumDestinationStat
123133
jdbcConfig[JdbcUtils.PASSWORD_KEY].asText()
124134
else null,
125135
driverClassName,
126-
jdbcConfig[JdbcUtils.JDBC_URL_KEY].asText()
136+
jdbcConfig[JdbcUtils.JDBC_URL_KEY].asText(),
127137
)
128138
.withConnectionProperties(connectionProperties)
129139
.withConnectionTimeout(getConnectionTimeout(connectionProperties))
@@ -155,8 +165,10 @@ abstract class AbstractJdbcDestination<DestinationState : MinimumDestinationStat
155165
for (key in defaultParameters.keys) {
156166
require(
157167
!(customParameters.containsKey(key) &&
158-
customParameters[key] != defaultParameters[key])
159-
) { "Cannot overwrite default JDBC parameter $key" }
168+
customParameters[key] != defaultParameters[key]),
169+
) {
170+
"Cannot overwrite default JDBC parameter $key"
171+
}
160172
}
161173
}
162174

@@ -191,7 +203,7 @@ abstract class AbstractJdbcDestination<DestinationState : MinimumDestinationStat
191203
* @param config
192204
* @return
193205
*/
194-
protected fun getDatabaseName(config: JsonNode): String {
206+
protected open fun getDatabaseName(config: JsonNode): String {
195207
return config[JdbcUtils.DATABASE_KEY].asText()
196208
}
197209

@@ -227,7 +239,7 @@ abstract class AbstractJdbcDestination<DestinationState : MinimumDestinationStat
227239
config,
228240
catalog,
229241
null,
230-
NoopTyperDeduper()
242+
NoopTyperDeduper(),
231243
)
232244
}
233245

@@ -238,10 +250,18 @@ abstract class AbstractJdbcDestination<DestinationState : MinimumDestinationStat
238250
catalog,
239251
outputRecordCollector,
240252
database,
241-
defaultNamespace
253+
defaultNamespace,
242254
)
243255
}
244256

257+
private fun isTypeDedupeDisabled(config: JsonNode): Boolean {
258+
return shouldAlwaysDisableTypeDedupe() ||
259+
(config.has(DISABLE_TYPE_DEDUPE) &&
260+
config[DISABLE_TYPE_DEDUPE].asBoolean(
261+
false,
262+
))
263+
}
264+
245265
private fun getV2MessageConsumer(
246266
config: JsonNode,
247267
catalog: ConfiguredAirbyteCatalog?,
@@ -256,51 +276,71 @@ abstract class AbstractJdbcDestination<DestinationState : MinimumDestinationStat
256276
.map { override: String -> CatalogParser(sqlGenerator, override) }
257277
.orElse(CatalogParser(sqlGenerator))
258278
.parseCatalog(catalog!!)
279+
val typerDeduper: TyperDeduper =
280+
buildTyperDeduper(
281+
config,
282+
database,
283+
parsedCatalog,
284+
)
285+
286+
return JdbcBufferedConsumerFactory.createAsync(
287+
outputRecordCollector,
288+
database,
289+
sqlOperations,
290+
namingResolver,
291+
config,
292+
catalog,
293+
defaultNamespace,
294+
typerDeduper,
295+
getDataTransformer(parsedCatalog, defaultNamespace),
296+
)
297+
}
298+
299+
private fun buildTyperDeduper(
300+
config: JsonNode,
301+
database: JdbcDatabase,
302+
parsedCatalog: ParsedCatalog,
303+
): TyperDeduper {
259304
val databaseName = getDatabaseName(config)
260-
val migrator = JdbcV1V2Migrator(namingResolver, database, databaseName)
261305
val v2TableMigrator = NoopV2TableMigrator()
306+
val migrator = JdbcV1V2Migrator(namingResolver, database, databaseName)
262307
val destinationHandler: DestinationHandler<DestinationState> =
263308
getDestinationHandler(
264309
databaseName,
265310
database,
266-
rawNamespaceOverride.orElse(JavaBaseConstants.DEFAULT_AIRBYTE_INTERNAL_NAMESPACE)
311+
getRawNamespaceOverride(RAW_SCHEMA_OVERRIDE)
312+
.orElse(JavaBaseConstants.DEFAULT_AIRBYTE_INTERNAL_NAMESPACE),
267313
)
268-
val disableTypeDedupe =
269-
config.has(DISABLE_TYPE_DEDUPE) && config[DISABLE_TYPE_DEDUPE].asBoolean(false)
270-
val typerDeduper: TyperDeduper
314+
val disableTypeDedupe = isTypeDedupeDisabled(config)
271315
val migrations = getMigrations(database, databaseName, sqlGenerator, destinationHandler)
272-
typerDeduper =
273-
if (disableTypeDedupe) {
274-
NoOpTyperDeduperWithV1V2Migrations(
275-
sqlGenerator,
276-
destinationHandler,
277-
parsedCatalog,
278-
migrator,
279-
v2TableMigrator,
280-
migrations
281-
)
282-
} else {
316+
317+
val typerDeduper: TyperDeduper
318+
if (disableTypeDedupe) {
319+
typerDeduper =
320+
if (migrations.isEmpty()) {
321+
NoopTyperDeduper()
322+
} else {
323+
NoOpTyperDeduperWithV1V2Migrations(
324+
sqlGenerator,
325+
destinationHandler,
326+
parsedCatalog,
327+
migrator,
328+
v2TableMigrator,
329+
migrations,
330+
)
331+
}
332+
} else {
333+
typerDeduper =
283334
DefaultTyperDeduper(
284335
sqlGenerator,
285336
destinationHandler,
286337
parsedCatalog,
287338
migrator,
288339
v2TableMigrator,
289-
migrations
340+
migrations,
290341
)
291-
}
292-
293-
return JdbcBufferedConsumerFactory.createAsync(
294-
outputRecordCollector,
295-
database,
296-
sqlOperations,
297-
namingResolver,
298-
config,
299-
catalog,
300-
defaultNamespace,
301-
typerDeduper,
302-
getDataTransformer(parsedCatalog, defaultNamespace)
303-
)
342+
}
343+
return typerDeduper
304344
}
305345

306346
companion object {
@@ -361,7 +401,7 @@ abstract class AbstractJdbcDestination<DestinationState : MinimumDestinationStat
361401
{ conn: Connection -> conn.metaData.catalogs },
362402
{ queryContext: ResultSet? ->
363403
JdbcUtils.defaultSourceOperations.rowToJson(queryContext!!)
364-
}
404+
},
365405
)
366406

367407
// verify we have write permissions on the target schema by creating a table with a
@@ -370,7 +410,7 @@ abstract class AbstractJdbcDestination<DestinationState : MinimumDestinationStat
370410
val outputTableName =
371411
namingResolver.getIdentifier(
372412
"_airbyte_connection_test_" +
373-
UUID.randomUUID().toString().replace("-".toRegex(), "")
413+
UUID.randomUUID().toString().replace("-".toRegex(), ""),
374414
)
375415
sqlOps.createSchemaIfNotExists(database, outputSchema)
376416
sqlOps.createTableIfNotExists(database, outputSchema, outputTableName)
@@ -381,7 +421,7 @@ abstract class AbstractJdbcDestination<DestinationState : MinimumDestinationStat
381421
database,
382422
java.util.List.of(dummyRecord),
383423
outputSchema,
384-
outputTableName
424+
outputTableName,
385425
)
386426
}
387427
} finally {
@@ -412,7 +452,7 @@ abstract class AbstractJdbcDestination<DestinationState : MinimumDestinationStat
412452
.withRecord(
413453
PartialAirbyteRecordMessage()
414454
.withStream("stream1")
415-
.withEmittedAt(1602637589000L)
455+
.withEmittedAt(1602637589000L),
416456
)
417457
.withSerialized(dummyDataToInsert.toString())
418458
}
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,50 @@
1+
/*
2+
* Copyright (c) 2024 Airbyte, Inc., all rights reserved.
3+
*/
4+
5+
package io.airbyte.cdk.integrations.destination.jdbc.typing_deduping
6+
7+
import com.fasterxml.jackson.databind.JsonNode
8+
import io.airbyte.cdk.db.jdbc.JdbcDatabase
9+
import io.airbyte.integrations.base.destination.typing_deduping.AirbyteType
10+
import io.airbyte.integrations.base.destination.typing_deduping.DestinationInitialStatus
11+
import io.airbyte.integrations.base.destination.typing_deduping.Sql
12+
import io.airbyte.integrations.base.destination.typing_deduping.StreamConfig
13+
import io.airbyte.integrations.base.destination.typing_deduping.StreamId
14+
import org.jooq.SQLDialect
15+
16+
class NoOpJdbcDestinationHandler<DestinationState>(
17+
databaseName: String,
18+
jdbcDatabase: JdbcDatabase,
19+
rawTableSchemaName: String,
20+
sqlDialect: SQLDialect
21+
) :
22+
JdbcDestinationHandler<DestinationState>(
23+
databaseName,
24+
jdbcDatabase,
25+
rawTableSchemaName,
26+
sqlDialect
27+
) {
28+
29+
override fun execute(sql: Sql) {
30+
throw NotImplementedError("This JDBC Destination Handler does not support typing deduping")
31+
}
32+
33+
override fun gatherInitialState(
34+
streamConfigs: List<StreamConfig>
35+
): List<DestinationInitialStatus<DestinationState>> {
36+
throw NotImplementedError("This JDBC Destination Handler does not support typing deduping")
37+
}
38+
39+
override fun commitDestinationStates(destinationStates: Map<StreamId, DestinationState>) {
40+
throw NotImplementedError("This JDBC Destination Handler does not support typing deduping")
41+
}
42+
43+
override fun toDestinationState(json: JsonNode?): DestinationState {
44+
throw NotImplementedError("This JDBC Destination Handler does not support typing deduping")
45+
}
46+
47+
override fun toJdbcTypeName(airbyteType: AirbyteType?): String {
48+
throw NotImplementedError("This JDBC Destination Handler does not support typing deduping")
49+
}
50+
}

airbyte-cdk/java/airbyte-cdk/db-destinations/src/testFixtures/kotlin/io/airbyte/cdk/integrations/standardtest/destination/DestinationAcceptanceTest.kt

+1-5
Original file line numberDiff line numberDiff line change
@@ -25,7 +25,6 @@ import io.airbyte.configoss.JobGetSpecConfig
2525
import io.airbyte.configoss.OperatorDbt
2626
import io.airbyte.configoss.StandardCheckConnectionInput
2727
import io.airbyte.configoss.StandardCheckConnectionOutput
28-
import io.airbyte.configoss.StandardCheckConnectionOutput.Status
2928
import io.airbyte.configoss.WorkerDestinationConfig
3029
import io.airbyte.protocol.models.Field
3130
import io.airbyte.protocol.models.JsonSchemaType
@@ -64,9 +63,6 @@ import java.util.concurrent.atomic.AtomicInteger
6463
import java.util.function.Consumer
6564
import java.util.stream.Collectors
6665
import java.util.stream.Stream
67-
import kotlin.Comparator
68-
import kotlin.collections.ArrayList
69-
import kotlin.collections.HashSet
7066
import kotlin.test.assertNotNull
7167
import org.junit.jupiter.api.*
7268
import org.junit.jupiter.api.extension.ExtensionContext
@@ -345,7 +341,7 @@ abstract class DestinationAcceptanceTest {
345341
"""This method is moved to the AdvancedTestDataComparator. Please move your destination
346342
implementation of the method to your comparator implementation."""
347343
)
348-
protected fun resolveIdentifier(identifier: String?): List<String?> {
344+
protected open fun resolveIdentifier(identifier: String?): List<String?> {
349345
return java.util.List.of(identifier)
350346
}
351347

0 commit comments

Comments
 (0)