diff --git a/airbyte-cdk/java/airbyte-cdk/core/src/main/kotlin/io/airbyte/cdk/integrations/destination/jdbc/JdbcGenerationHandler.kt b/airbyte-cdk/java/airbyte-cdk/core/src/main/kotlin/io/airbyte/cdk/integrations/destination/jdbc/JdbcGenerationHandler.kt new file mode 100644 index 0000000000000..ace75630e7829 --- /dev/null +++ b/airbyte-cdk/java/airbyte-cdk/core/src/main/kotlin/io/airbyte/cdk/integrations/destination/jdbc/JdbcGenerationHandler.kt @@ -0,0 +1,16 @@ +/* + * Copyright (c) 2024 Airbyte, Inc., all rights reserved. + */ + +package io.airbyte.cdk.integrations.destination.jdbc + +import io.airbyte.cdk.db.jdbc.JdbcDatabase + +interface JdbcGenerationHandler { + /** + * get the value of _airbyte_generation_id for any row in table {@code rawNamespace.rawName} + * + * @returns true if the table exists and contains such a row, false otherwise + */ + fun getGenerationIdInTable(database: JdbcDatabase, namespace: String, name: String): Long? +} diff --git a/airbyte-cdk/java/airbyte-cdk/core/src/main/kotlin/io/airbyte/cdk/integrations/destination/jdbc/SqlOperations.kt b/airbyte-cdk/java/airbyte-cdk/core/src/main/kotlin/io/airbyte/cdk/integrations/destination/jdbc/SqlOperations.kt index 9038c2e65e4f7..107d44a4b8c01 100644 --- a/airbyte-cdk/java/airbyte-cdk/core/src/main/kotlin/io/airbyte/cdk/integrations/destination/jdbc/SqlOperations.kt +++ b/airbyte-cdk/java/airbyte-cdk/core/src/main/kotlin/io/airbyte/cdk/integrations/destination/jdbc/SqlOperations.kt @@ -132,13 +132,6 @@ interface SqlOperations { /** Check if the data record is valid and ok to be written to destination */ fun isValidData(data: JsonNode?): Boolean - /** - * get the value of _airbyte_generation_id for any row in table {@code rawNamespace.rawName} - * - * @returns true if the table exists and contains such a row, false otherwise - */ - fun getGenerationIdInTable(database: JdbcDatabase, namespace: String, name: String): Long? - /** overwrite the raw table with the temporary raw table */ fun overwriteRawTable(database: JdbcDatabase, rawNamespace: String, rawName: String) diff --git a/airbyte-cdk/java/airbyte-cdk/db-destinations/src/main/kotlin/io/airbyte/cdk/integrations/destination/jdbc/AbstractJdbcDestination.kt b/airbyte-cdk/java/airbyte-cdk/db-destinations/src/main/kotlin/io/airbyte/cdk/integrations/destination/jdbc/AbstractJdbcDestination.kt index 30efc96bd4876..ddbb111882d11 100644 --- a/airbyte-cdk/java/airbyte-cdk/db-destinations/src/main/kotlin/io/airbyte/cdk/integrations/destination/jdbc/AbstractJdbcDestination.kt +++ b/airbyte-cdk/java/airbyte-cdk/db-destinations/src/main/kotlin/io/airbyte/cdk/integrations/destination/jdbc/AbstractJdbcDestination.kt @@ -189,6 +189,8 @@ abstract class AbstractJdbcDestination, database: JdbcDatabase, sqlOperations: SqlOperations, + generationIdHandler: JdbcGenerationHandler, namingResolver: NamingConventionTransformer, config: JsonNode, catalog: ConfiguredAirbyteCatalog, @@ -77,12 +78,19 @@ object JdbcBufferedConsumerFactory { onStartFunction( database, sqlOperations, + generationIdHandler, writeConfigs, typerDeduper, namingResolver, parsedCatalog ), - onCloseFunction(database, sqlOperations, parsedCatalog, typerDeduper), + onCloseFunction( + database, + sqlOperations, + generationIdHandler, + parsedCatalog, + typerDeduper + ), JdbcInsertFlushFunction( defaultNamespace, recordWriterFunction(database, sqlOperations, writeConfigs, catalog), @@ -98,7 +106,7 @@ object JdbcBufferedConsumerFactory { private fun createWriteConfigs( database: JdbcDatabase, - sqlOperations: SqlOperations, + generationIdHandler: JdbcGenerationHandler, namingResolver: NamingConventionTransformer, parsedCatalog: ParsedCatalog, ): List { @@ -106,7 +114,7 @@ object JdbcBufferedConsumerFactory { val rawSuffix: String = if ( it.minimumGenerationId == 0L || - sqlOperations.getGenerationIdInTable( + generationIdHandler.getGenerationIdInTable( database, it.id.rawNamespace, it.id.rawName @@ -162,6 +170,7 @@ object JdbcBufferedConsumerFactory { private fun onStartFunction( database: JdbcDatabase, sqlOperations: SqlOperations, + generationIdHandler: JdbcGenerationHandler, writeConfigs: MutableList, typerDeduper: TyperDeduper, namingResolver: NamingConventionTransformer, @@ -170,7 +179,7 @@ object JdbcBufferedConsumerFactory { return OnStartFunction { typerDeduper.prepareSchemasAndRunMigrations() writeConfigs.addAll( - createWriteConfigs(database, sqlOperations, namingResolver, parsedCatalog) + createWriteConfigs(database, generationIdHandler, namingResolver, parsedCatalog) ) LOGGER.info { "Preparing raw tables in destination started for ${writeConfigs.size} streams" @@ -194,7 +203,7 @@ object JdbcBufferedConsumerFactory { 0L -> {} writeConfig.generationId -> if ( - sqlOperations.getGenerationIdInTable( + generationIdHandler.getGenerationIdInTable( database, schemaName, dstTableName + writeConfig.rawTableSuffix @@ -268,6 +277,7 @@ object JdbcBufferedConsumerFactory { private fun onCloseFunction( database: JdbcDatabase, sqlOperations: SqlOperations, + generationIdHandler: JdbcGenerationHandler, catalog: ParsedCatalog, typerDeduper: TyperDeduper ): OnCloseFunction { @@ -278,7 +288,7 @@ object JdbcBufferedConsumerFactory { catalog.streams.forEach { if ( it.minimumGenerationId != 0L && - sqlOperations.getGenerationIdInTable( + generationIdHandler.getGenerationIdInTable( database, it.id.rawNamespace, it.id.rawName diff --git a/airbyte-cdk/java/airbyte-cdk/db-destinations/src/main/kotlin/io/airbyte/cdk/integrations/destination/jdbc/typing_deduping/JdbcDestinationHandler.kt b/airbyte-cdk/java/airbyte-cdk/db-destinations/src/main/kotlin/io/airbyte/cdk/integrations/destination/jdbc/typing_deduping/JdbcDestinationHandler.kt index aaef160bbb9e5..6e767e9cb6be6 100644 --- a/airbyte-cdk/java/airbyte-cdk/db-destinations/src/main/kotlin/io/airbyte/cdk/integrations/destination/jdbc/typing_deduping/JdbcDestinationHandler.kt +++ b/airbyte-cdk/java/airbyte-cdk/db-destinations/src/main/kotlin/io/airbyte/cdk/integrations/destination/jdbc/typing_deduping/JdbcDestinationHandler.kt @@ -9,7 +9,7 @@ import io.airbyte.cdk.db.jdbc.JdbcDatabase import io.airbyte.cdk.integrations.base.JavaBaseConstants import io.airbyte.cdk.integrations.base.JavaBaseConstants.DestinationColumns import io.airbyte.cdk.integrations.destination.jdbc.ColumnDefinition -import io.airbyte.cdk.integrations.destination.jdbc.SqlOperations +import io.airbyte.cdk.integrations.destination.jdbc.JdbcGenerationHandler import io.airbyte.cdk.integrations.destination.jdbc.TableDefinition import io.airbyte.cdk.integrations.util.ConnectorExceptionUtil.getResultsOrLogAndThrowFirst import io.airbyte.commons.concurrency.CompletableFutures @@ -55,7 +55,7 @@ abstract class JdbcDestinationHandler( protected val rawTableNamespace: String, private val dialect: SQLDialect, private val columns: DestinationColumns = DestinationColumns.V2_WITH_GENERATION, - private val sqlOperations: SqlOperations, + protected val generationHandler: JdbcGenerationHandler, ) : DestinationHandler { protected val dslContext: DSLContext get() = DSL.using(dialect) @@ -343,13 +343,13 @@ abstract class JdbcDestinationHandler( isFinalTableEmpty, destinationState, finalTableGenerationId = - sqlOperations.getGenerationIdInTable( + generationHandler.getGenerationIdInTable( jdbcDatabase, streamConfig.id.rawNamespace, streamConfig.id.rawName ), finalTempTableGenerationId = - sqlOperations.getGenerationIdInTable( + generationHandler.getGenerationIdInTable( jdbcDatabase, streamConfig.id.rawNamespace, streamConfig.id.rawName + AbstractStreamOperation.TMP_TABLE_SUFFIX diff --git a/airbyte-cdk/java/airbyte-cdk/db-destinations/src/main/kotlin/io/airbyte/cdk/integrations/destination/jdbc/typing_deduping/NoOpJdbcDestinationHandler.kt b/airbyte-cdk/java/airbyte-cdk/db-destinations/src/main/kotlin/io/airbyte/cdk/integrations/destination/jdbc/typing_deduping/NoOpJdbcDestinationHandler.kt index 633f0e56d4fb7..76bec314d62bf 100644 --- a/airbyte-cdk/java/airbyte-cdk/db-destinations/src/main/kotlin/io/airbyte/cdk/integrations/destination/jdbc/typing_deduping/NoOpJdbcDestinationHandler.kt +++ b/airbyte-cdk/java/airbyte-cdk/db-destinations/src/main/kotlin/io/airbyte/cdk/integrations/destination/jdbc/typing_deduping/NoOpJdbcDestinationHandler.kt @@ -6,7 +6,7 @@ package io.airbyte.cdk.integrations.destination.jdbc.typing_deduping import com.fasterxml.jackson.databind.JsonNode import io.airbyte.cdk.db.jdbc.JdbcDatabase -import io.airbyte.cdk.integrations.destination.jdbc.SqlOperations +import io.airbyte.cdk.integrations.destination.jdbc.JdbcGenerationHandler import io.airbyte.integrations.base.destination.typing_deduping.AirbyteType import io.airbyte.integrations.base.destination.typing_deduping.DestinationInitialStatus import io.airbyte.integrations.base.destination.typing_deduping.Sql @@ -19,14 +19,14 @@ class NoOpJdbcDestinationHandler( jdbcDatabase: JdbcDatabase, rawTableSchemaName: String, sqlDialect: SQLDialect, - sqlOperations: SqlOperations, + generationHandler: JdbcGenerationHandler, ) : JdbcDestinationHandler( databaseName, jdbcDatabase, rawTableSchemaName, sqlDialect, - sqlOperations = sqlOperations + generationHandler = generationHandler, ) { override fun execute(sql: Sql) { diff --git a/airbyte-cdk/java/airbyte-cdk/db-destinations/src/test/kotlin/io/airbyte/cdk/integrations/destination/jdbc/AbstractJdbcDestinationTest.kt b/airbyte-cdk/java/airbyte-cdk/db-destinations/src/test/kotlin/io/airbyte/cdk/integrations/destination/jdbc/AbstractJdbcDestinationTest.kt index dd76097385e1d..ea81b7514b701 100644 --- a/airbyte-cdk/java/airbyte-cdk/db-destinations/src/test/kotlin/io/airbyte/cdk/integrations/destination/jdbc/AbstractJdbcDestinationTest.kt +++ b/airbyte-cdk/java/airbyte-cdk/db-destinations/src/test/kotlin/io/airbyte/cdk/integrations/destination/jdbc/AbstractJdbcDestinationTest.kt @@ -153,6 +153,8 @@ class AbstractJdbcDestinationTest { override fun getSqlOperations(config: JsonNode): SqlOperations = TestJdbcSqlOperations() + override fun getGenerationHandler(): JdbcGenerationHandler = mock() + override fun getDestinationHandler( config: JsonNode, databaseName: String, diff --git a/airbyte-cdk/java/airbyte-cdk/db-destinations/src/test/kotlin/io/airbyte/cdk/integrations/destination/jdbc/TestJdbcSqlOperations.kt b/airbyte-cdk/java/airbyte-cdk/db-destinations/src/test/kotlin/io/airbyte/cdk/integrations/destination/jdbc/TestJdbcSqlOperations.kt index b42c3f9b3a26e..ff79dd1b09287 100644 --- a/airbyte-cdk/java/airbyte-cdk/db-destinations/src/test/kotlin/io/airbyte/cdk/integrations/destination/jdbc/TestJdbcSqlOperations.kt +++ b/airbyte-cdk/java/airbyte-cdk/db-destinations/src/test/kotlin/io/airbyte/cdk/integrations/destination/jdbc/TestJdbcSqlOperations.kt @@ -23,15 +23,6 @@ class TestJdbcSqlOperations : JdbcSqlOperations() { // Not required for the testing } - override fun getGenerationIdInTable( - database: JdbcDatabase, - rawNamespace: String, - rawName: String - ): Long { - return -1L - // Not required for the testing - } - override fun overwriteRawTable(database: JdbcDatabase, rawNamespace: String, rawName: String) { // Not required for the testing }