Skip to content

Update CDK for Raw Only Dv2 destinations #36047

New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Merged
merged 1 commit into from
Apr 10, 2024
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -64,6 +64,7 @@ object JdbcUtils {
)
@JvmStatic val defaultSourceOperations: JdbcSourceOperations = JdbcSourceOperations()

@JvmStatic
val defaultJSONFormat: JSONFormat = JSONFormat().recordFormat(JSONFormat.RecordFormat.OBJECT)

@JvmStatic
Expand All @@ -85,6 +86,7 @@ object JdbcUtils {
}
}

@JvmStatic
@JvmOverloads
fun parseJdbcParameters(
jdbcPropertiesString: String,
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -3,6 +3,13 @@
*/
package io.airbyte.cdk.integrations.base

import java.util.*
import org.apache.commons.lang3.StringUtils

fun upperQuoted(column: String): String {
Copy link
Contributor

@stephane-airbyte stephane-airbyte Apr 6, 2024

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Why is this outside of the object, and not used in the object (or anywhere in this pr)?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

It's used in the stacked PR on top - it could probably be moved to another package if that makes more sense.

I thought keeping static methods outside of an object was the Kotlin way - do we prefer a different approach

return StringUtils.wrap(column.uppercase(Locale.getDefault()), "\"")
}

object JavaBaseConstants {
const val ARGS_CONFIG_KEY: String = "config"
const val ARGS_CATALOG_KEY: String = "catalog"
Expand Down Expand Up @@ -33,7 +40,7 @@ object JavaBaseConstants {
COLUMN_NAME_AB_RAW_ID,
COLUMN_NAME_AB_EXTRACTED_AT,
COLUMN_NAME_AB_LOADED_AT,
COLUMN_NAME_DATA
COLUMN_NAME_DATA,
)
@JvmField
val V2_RAW_TABLE_COLUMN_NAMES: List<String> =
Expand All @@ -42,7 +49,7 @@ object JavaBaseConstants {
COLUMN_NAME_AB_EXTRACTED_AT,
COLUMN_NAME_AB_LOADED_AT,
COLUMN_NAME_DATA,
COLUMN_NAME_AB_META
COLUMN_NAME_AB_META,
)
@JvmField
val V2_FINAL_TABLE_METADATA_COLUMNS: List<String> =
Expand Down
Original file line number Diff line number Diff line change
@@ -1 +1 @@
version=0.29.8
version=0.29.9
Original file line number Diff line number Diff line change
Expand Up @@ -56,6 +56,16 @@ abstract class AbstractJdbcDestination<DestinationState : MinimumDestinationStat
protected val configSchemaKey: String
get() = "schema"

/**
* If the destination should always disable type dedupe, override this method to return true. We
* only type and dedupe if we create final tables.
*
* @return whether the destination should always disable type dedupe
*/
protected open fun shouldAlwaysDisableTypeDedupe(): Boolean {
return false
}

override fun check(config: JsonNode): AirbyteConnectionStatus? {
val dataSource = getDataSource(config)

Expand All @@ -67,7 +77,7 @@ abstract class AbstractJdbcDestination<DestinationState : MinimumDestinationStat
val v2RawSchema =
namingResolver.getIdentifier(
getRawNamespaceOverride(RAW_SCHEMA_OVERRIDE)
.orElse(JavaBaseConstants.DEFAULT_AIRBYTE_INTERNAL_NAMESPACE)
.orElse(JavaBaseConstants.DEFAULT_AIRBYTE_INTERNAL_NAMESPACE),
)
attemptTableOperations(v2RawSchema, database, namingResolver, sqlOperations, false)
destinationSpecificTableOperations(database)
Expand All @@ -87,7 +97,7 @@ abstract class AbstractJdbcDestination<DestinationState : MinimumDestinationStat
"""
Could not connect with provided configuration.
${e.message}
""".trimIndent()
""".trimIndent(),
)
} finally {
try {
Expand Down Expand Up @@ -123,7 +133,7 @@ abstract class AbstractJdbcDestination<DestinationState : MinimumDestinationStat
jdbcConfig[JdbcUtils.PASSWORD_KEY].asText()
else null,
driverClassName,
jdbcConfig[JdbcUtils.JDBC_URL_KEY].asText()
jdbcConfig[JdbcUtils.JDBC_URL_KEY].asText(),
)
.withConnectionProperties(connectionProperties)
.withConnectionTimeout(getConnectionTimeout(connectionProperties))
Expand Down Expand Up @@ -155,8 +165,10 @@ abstract class AbstractJdbcDestination<DestinationState : MinimumDestinationStat
for (key in defaultParameters.keys) {
require(
!(customParameters.containsKey(key) &&
customParameters[key] != defaultParameters[key])
) { "Cannot overwrite default JDBC parameter $key" }
customParameters[key] != defaultParameters[key]),
) {
"Cannot overwrite default JDBC parameter $key"
}
}
}

Expand Down Expand Up @@ -191,7 +203,7 @@ abstract class AbstractJdbcDestination<DestinationState : MinimumDestinationStat
* @param config
* @return
*/
protected fun getDatabaseName(config: JsonNode): String {
protected open fun getDatabaseName(config: JsonNode): String {
return config[JdbcUtils.DATABASE_KEY].asText()
}

Expand Down Expand Up @@ -227,7 +239,7 @@ abstract class AbstractJdbcDestination<DestinationState : MinimumDestinationStat
config,
catalog,
null,
NoopTyperDeduper()
NoopTyperDeduper(),
)
}

Expand All @@ -238,10 +250,18 @@ abstract class AbstractJdbcDestination<DestinationState : MinimumDestinationStat
catalog,
outputRecordCollector,
database,
defaultNamespace
defaultNamespace,
)
}

private fun isTypeDedupeDisabled(config: JsonNode): Boolean {
return shouldAlwaysDisableTypeDedupe() ||
(config.has(DISABLE_TYPE_DEDUPE) &&
config[DISABLE_TYPE_DEDUPE].asBoolean(
false,
))
}

private fun getV2MessageConsumer(
config: JsonNode,
catalog: ConfiguredAirbyteCatalog?,
Expand All @@ -256,51 +276,71 @@ abstract class AbstractJdbcDestination<DestinationState : MinimumDestinationStat
.map { override: String -> CatalogParser(sqlGenerator, override) }
.orElse(CatalogParser(sqlGenerator))
.parseCatalog(catalog!!)
val typerDeduper: TyperDeduper =
buildTyperDeduper(
config,
database,
parsedCatalog,
)

return JdbcBufferedConsumerFactory.createAsync(
outputRecordCollector,
database,
sqlOperations,
namingResolver,
config,
catalog,
defaultNamespace,
typerDeduper,
getDataTransformer(parsedCatalog, defaultNamespace),
)
}

private fun buildTyperDeduper(
config: JsonNode,
database: JdbcDatabase,
parsedCatalog: ParsedCatalog,
): TyperDeduper {
val databaseName = getDatabaseName(config)
val migrator = JdbcV1V2Migrator(namingResolver, database, databaseName)
val v2TableMigrator = NoopV2TableMigrator()
val migrator = JdbcV1V2Migrator(namingResolver, database, databaseName)
val destinationHandler: DestinationHandler<DestinationState> =
getDestinationHandler(
databaseName,
database,
rawNamespaceOverride.orElse(JavaBaseConstants.DEFAULT_AIRBYTE_INTERNAL_NAMESPACE)
getRawNamespaceOverride(RAW_SCHEMA_OVERRIDE)
.orElse(JavaBaseConstants.DEFAULT_AIRBYTE_INTERNAL_NAMESPACE),
)
val disableTypeDedupe =
config.has(DISABLE_TYPE_DEDUPE) && config[DISABLE_TYPE_DEDUPE].asBoolean(false)
val typerDeduper: TyperDeduper
val disableTypeDedupe = isTypeDedupeDisabled(config)
val migrations = getMigrations(database, databaseName, sqlGenerator, destinationHandler)
typerDeduper =
if (disableTypeDedupe) {
NoOpTyperDeduperWithV1V2Migrations(
sqlGenerator,
destinationHandler,
parsedCatalog,
migrator,
v2TableMigrator,
migrations
)
} else {

val typerDeduper: TyperDeduper
if (disableTypeDedupe) {
typerDeduper =
if (migrations.isEmpty()) {
NoopTyperDeduper()
} else {
NoOpTyperDeduperWithV1V2Migrations(
sqlGenerator,
destinationHandler,
parsedCatalog,
migrator,
v2TableMigrator,
migrations,
)
}
} else {
typerDeduper =
DefaultTyperDeduper(
sqlGenerator,
destinationHandler,
parsedCatalog,
migrator,
v2TableMigrator,
migrations
migrations,
)
}

return JdbcBufferedConsumerFactory.createAsync(
outputRecordCollector,
database,
sqlOperations,
namingResolver,
config,
catalog,
defaultNamespace,
typerDeduper,
getDataTransformer(parsedCatalog, defaultNamespace)
)
}
return typerDeduper
}

companion object {
Expand Down Expand Up @@ -361,7 +401,7 @@ abstract class AbstractJdbcDestination<DestinationState : MinimumDestinationStat
{ conn: Connection -> conn.metaData.catalogs },
{ queryContext: ResultSet? ->
JdbcUtils.defaultSourceOperations.rowToJson(queryContext!!)
}
},
)

// verify we have write permissions on the target schema by creating a table with a
Expand All @@ -370,7 +410,7 @@ abstract class AbstractJdbcDestination<DestinationState : MinimumDestinationStat
val outputTableName =
namingResolver.getIdentifier(
"_airbyte_connection_test_" +
UUID.randomUUID().toString().replace("-".toRegex(), "")
UUID.randomUUID().toString().replace("-".toRegex(), ""),
)
sqlOps.createSchemaIfNotExists(database, outputSchema)
sqlOps.createTableIfNotExists(database, outputSchema, outputTableName)
Expand All @@ -381,7 +421,7 @@ abstract class AbstractJdbcDestination<DestinationState : MinimumDestinationStat
database,
java.util.List.of(dummyRecord),
outputSchema,
outputTableName
outputTableName,
)
}
} finally {
Expand Down Expand Up @@ -412,7 +452,7 @@ abstract class AbstractJdbcDestination<DestinationState : MinimumDestinationStat
.withRecord(
PartialAirbyteRecordMessage()
.withStream("stream1")
.withEmittedAt(1602637589000L)
.withEmittedAt(1602637589000L),
)
.withSerialized(dummyDataToInsert.toString())
}
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,50 @@
/*
* Copyright (c) 2024 Airbyte, Inc., all rights reserved.
*/

package io.airbyte.cdk.integrations.destination.jdbc.typing_deduping

import com.fasterxml.jackson.databind.JsonNode
import io.airbyte.cdk.db.jdbc.JdbcDatabase
import io.airbyte.integrations.base.destination.typing_deduping.AirbyteType
import io.airbyte.integrations.base.destination.typing_deduping.DestinationInitialStatus
import io.airbyte.integrations.base.destination.typing_deduping.Sql
import io.airbyte.integrations.base.destination.typing_deduping.StreamConfig
import io.airbyte.integrations.base.destination.typing_deduping.StreamId
import org.jooq.SQLDialect

class NoOpJdbcDestinationHandler<DestinationState>(
databaseName: String,
jdbcDatabase: JdbcDatabase,
rawTableSchemaName: String,
sqlDialect: SQLDialect
) :
JdbcDestinationHandler<DestinationState>(
databaseName,
jdbcDatabase,
rawTableSchemaName,
sqlDialect
) {

override fun execute(sql: Sql) {
throw NotImplementedError("This JDBC Destination Handler does not support typing deduping")
}

override fun gatherInitialState(
streamConfigs: List<StreamConfig>
): List<DestinationInitialStatus<DestinationState>> {
throw NotImplementedError("This JDBC Destination Handler does not support typing deduping")
}

override fun commitDestinationStates(destinationStates: Map<StreamId, DestinationState>) {
throw NotImplementedError("This JDBC Destination Handler does not support typing deduping")
}

override fun toDestinationState(json: JsonNode?): DestinationState {
throw NotImplementedError("This JDBC Destination Handler does not support typing deduping")
}

override fun toJdbcTypeName(airbyteType: AirbyteType?): String {
throw NotImplementedError("This JDBC Destination Handler does not support typing deduping")
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -25,7 +25,6 @@ import io.airbyte.configoss.JobGetSpecConfig
import io.airbyte.configoss.OperatorDbt
import io.airbyte.configoss.StandardCheckConnectionInput
import io.airbyte.configoss.StandardCheckConnectionOutput
import io.airbyte.configoss.StandardCheckConnectionOutput.Status
import io.airbyte.configoss.WorkerDestinationConfig
import io.airbyte.protocol.models.Field
import io.airbyte.protocol.models.JsonSchemaType
Expand Down Expand Up @@ -64,9 +63,6 @@ import java.util.concurrent.atomic.AtomicInteger
import java.util.function.Consumer
import java.util.stream.Collectors
import java.util.stream.Stream
import kotlin.Comparator
import kotlin.collections.ArrayList
import kotlin.collections.HashSet
import kotlin.test.assertNotNull
import org.junit.jupiter.api.*
import org.junit.jupiter.api.extension.ExtensionContext
Expand Down Expand Up @@ -345,7 +341,7 @@ abstract class DestinationAcceptanceTest {
"""This method is moved to the AdvancedTestDataComparator. Please move your destination
implementation of the method to your comparator implementation."""
)
protected fun resolveIdentifier(identifier: String?): List<String?> {
protected open fun resolveIdentifier(identifier: String?): List<String?> {
return java.util.List.of(identifier)
}

Expand Down
Loading