Skip to content

Commit 52130f5

Browse files
cdk-java move the generationId handling to its own class (#43329)
This pull request introduces a new interface, `JdbcGenerationHandler`, to handle the retrieval of the `_airbyte_generation_id` for any row in a table. The `getGenerationIdInTable` method has been moved from `SqlOperations` to this new interface. The necessary changes have been made across various classes to integrate this new interface properly.
1 parent c74bce4 commit 52130f5

File tree

8 files changed

+44
-29
lines changed

8 files changed

+44
-29
lines changed
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,16 @@
1+
/*
2+
* Copyright (c) 2024 Airbyte, Inc., all rights reserved.
3+
*/
4+
5+
package io.airbyte.cdk.integrations.destination.jdbc
6+
7+
import io.airbyte.cdk.db.jdbc.JdbcDatabase
8+
9+
interface JdbcGenerationHandler {
10+
/**
11+
* get the value of _airbyte_generation_id for any row in table {@code rawNamespace.rawName}
12+
*
13+
* @returns true if the table exists and contains such a row, false otherwise
14+
*/
15+
fun getGenerationIdInTable(database: JdbcDatabase, namespace: String, name: String): Long?
16+
}

airbyte-cdk/java/airbyte-cdk/core/src/main/kotlin/io/airbyte/cdk/integrations/destination/jdbc/SqlOperations.kt

-7
Original file line numberDiff line numberDiff line change
@@ -132,13 +132,6 @@ interface SqlOperations {
132132
/** Check if the data record is valid and ok to be written to destination */
133133
fun isValidData(data: JsonNode?): Boolean
134134

135-
/**
136-
* get the value of _airbyte_generation_id for any row in table {@code rawNamespace.rawName}
137-
*
138-
* @returns true if the table exists and contains such a row, false otherwise
139-
*/
140-
fun getGenerationIdInTable(database: JdbcDatabase, namespace: String, name: String): Long?
141-
142135
/** overwrite the raw table with the temporary raw table */
143136
fun overwriteRawTable(database: JdbcDatabase, rawNamespace: String, rawName: String)
144137

airbyte-cdk/java/airbyte-cdk/db-destinations/src/main/kotlin/io/airbyte/cdk/integrations/destination/jdbc/AbstractJdbcDestination.kt

+3
Original file line numberDiff line numberDiff line change
@@ -189,6 +189,8 @@ abstract class AbstractJdbcDestination<DestinationState : MinimumDestinationStat
189189

190190
protected abstract fun getSqlOperations(config: JsonNode): SqlOperations
191191

192+
protected abstract fun getGenerationHandler(): JdbcGenerationHandler
193+
192194
protected abstract fun getDestinationHandler(
193195
config: JsonNode,
194196
databaseName: String,
@@ -289,6 +291,7 @@ abstract class AbstractJdbcDestination<DestinationState : MinimumDestinationStat
289291
outputRecordCollector,
290292
database,
291293
getSqlOperations(config),
294+
getGenerationHandler(),
292295
namingResolver,
293296
config,
294297
catalog,

airbyte-cdk/java/airbyte-cdk/db-destinations/src/main/kotlin/io/airbyte/cdk/integrations/destination/jdbc/JdbcBufferedConsumerFactory.kt

+16-6
Original file line numberDiff line numberDiff line change
@@ -55,6 +55,7 @@ object JdbcBufferedConsumerFactory {
5555
outputRecordCollector: Consumer<AirbyteMessage>,
5656
database: JdbcDatabase,
5757
sqlOperations: SqlOperations,
58+
generationIdHandler: JdbcGenerationHandler,
5859
namingResolver: NamingConventionTransformer,
5960
config: JsonNode,
6061
catalog: ConfiguredAirbyteCatalog,
@@ -77,12 +78,19 @@ object JdbcBufferedConsumerFactory {
7778
onStartFunction(
7879
database,
7980
sqlOperations,
81+
generationIdHandler,
8082
writeConfigs,
8183
typerDeduper,
8284
namingResolver,
8385
parsedCatalog
8486
),
85-
onCloseFunction(database, sqlOperations, parsedCatalog, typerDeduper),
87+
onCloseFunction(
88+
database,
89+
sqlOperations,
90+
generationIdHandler,
91+
parsedCatalog,
92+
typerDeduper
93+
),
8694
JdbcInsertFlushFunction(
8795
defaultNamespace,
8896
recordWriterFunction(database, sqlOperations, writeConfigs, catalog),
@@ -98,15 +106,15 @@ object JdbcBufferedConsumerFactory {
98106

99107
private fun createWriteConfigs(
100108
database: JdbcDatabase,
101-
sqlOperations: SqlOperations,
109+
generationIdHandler: JdbcGenerationHandler,
102110
namingResolver: NamingConventionTransformer,
103111
parsedCatalog: ParsedCatalog,
104112
): List<WriteConfig> {
105113
return parsedCatalog.streams.map {
106114
val rawSuffix: String =
107115
if (
108116
it.minimumGenerationId == 0L ||
109-
sqlOperations.getGenerationIdInTable(
117+
generationIdHandler.getGenerationIdInTable(
110118
database,
111119
it.id.rawNamespace,
112120
it.id.rawName
@@ -162,6 +170,7 @@ object JdbcBufferedConsumerFactory {
162170
private fun onStartFunction(
163171
database: JdbcDatabase,
164172
sqlOperations: SqlOperations,
173+
generationIdHandler: JdbcGenerationHandler,
165174
writeConfigs: MutableList<WriteConfig>,
166175
typerDeduper: TyperDeduper,
167176
namingResolver: NamingConventionTransformer,
@@ -170,7 +179,7 @@ object JdbcBufferedConsumerFactory {
170179
return OnStartFunction {
171180
typerDeduper.prepareSchemasAndRunMigrations()
172181
writeConfigs.addAll(
173-
createWriteConfigs(database, sqlOperations, namingResolver, parsedCatalog)
182+
createWriteConfigs(database, generationIdHandler, namingResolver, parsedCatalog)
174183
)
175184
LOGGER.info {
176185
"Preparing raw tables in destination started for ${writeConfigs.size} streams"
@@ -194,7 +203,7 @@ object JdbcBufferedConsumerFactory {
194203
0L -> {}
195204
writeConfig.generationId ->
196205
if (
197-
sqlOperations.getGenerationIdInTable(
206+
generationIdHandler.getGenerationIdInTable(
198207
database,
199208
schemaName,
200209
dstTableName + writeConfig.rawTableSuffix
@@ -268,6 +277,7 @@ object JdbcBufferedConsumerFactory {
268277
private fun onCloseFunction(
269278
database: JdbcDatabase,
270279
sqlOperations: SqlOperations,
280+
generationIdHandler: JdbcGenerationHandler,
271281
catalog: ParsedCatalog,
272282
typerDeduper: TyperDeduper
273283
): OnCloseFunction {
@@ -278,7 +288,7 @@ object JdbcBufferedConsumerFactory {
278288
catalog.streams.forEach {
279289
if (
280290
it.minimumGenerationId != 0L &&
281-
sqlOperations.getGenerationIdInTable(
291+
generationIdHandler.getGenerationIdInTable(
282292
database,
283293
it.id.rawNamespace,
284294
it.id.rawName

airbyte-cdk/java/airbyte-cdk/db-destinations/src/main/kotlin/io/airbyte/cdk/integrations/destination/jdbc/typing_deduping/JdbcDestinationHandler.kt

+4-4
Original file line numberDiff line numberDiff line change
@@ -9,7 +9,7 @@ import io.airbyte.cdk.db.jdbc.JdbcDatabase
99
import io.airbyte.cdk.integrations.base.JavaBaseConstants
1010
import io.airbyte.cdk.integrations.base.JavaBaseConstants.DestinationColumns
1111
import io.airbyte.cdk.integrations.destination.jdbc.ColumnDefinition
12-
import io.airbyte.cdk.integrations.destination.jdbc.SqlOperations
12+
import io.airbyte.cdk.integrations.destination.jdbc.JdbcGenerationHandler
1313
import io.airbyte.cdk.integrations.destination.jdbc.TableDefinition
1414
import io.airbyte.cdk.integrations.util.ConnectorExceptionUtil.getResultsOrLogAndThrowFirst
1515
import io.airbyte.commons.concurrency.CompletableFutures
@@ -55,7 +55,7 @@ abstract class JdbcDestinationHandler<DestinationState>(
5555
protected val rawTableNamespace: String,
5656
private val dialect: SQLDialect,
5757
private val columns: DestinationColumns = DestinationColumns.V2_WITH_GENERATION,
58-
private val sqlOperations: SqlOperations,
58+
protected val generationHandler: JdbcGenerationHandler,
5959
) : DestinationHandler<DestinationState> {
6060
protected val dslContext: DSLContext
6161
get() = DSL.using(dialect)
@@ -343,13 +343,13 @@ abstract class JdbcDestinationHandler<DestinationState>(
343343
isFinalTableEmpty,
344344
destinationState,
345345
finalTableGenerationId =
346-
sqlOperations.getGenerationIdInTable(
346+
generationHandler.getGenerationIdInTable(
347347
jdbcDatabase,
348348
streamConfig.id.rawNamespace,
349349
streamConfig.id.rawName
350350
),
351351
finalTempTableGenerationId =
352-
sqlOperations.getGenerationIdInTable(
352+
generationHandler.getGenerationIdInTable(
353353
jdbcDatabase,
354354
streamConfig.id.rawNamespace,
355355
streamConfig.id.rawName + AbstractStreamOperation.TMP_TABLE_SUFFIX

airbyte-cdk/java/airbyte-cdk/db-destinations/src/main/kotlin/io/airbyte/cdk/integrations/destination/jdbc/typing_deduping/NoOpJdbcDestinationHandler.kt

+3-3
Original file line numberDiff line numberDiff line change
@@ -6,7 +6,7 @@ package io.airbyte.cdk.integrations.destination.jdbc.typing_deduping
66

77
import com.fasterxml.jackson.databind.JsonNode
88
import io.airbyte.cdk.db.jdbc.JdbcDatabase
9-
import io.airbyte.cdk.integrations.destination.jdbc.SqlOperations
9+
import io.airbyte.cdk.integrations.destination.jdbc.JdbcGenerationHandler
1010
import io.airbyte.integrations.base.destination.typing_deduping.AirbyteType
1111
import io.airbyte.integrations.base.destination.typing_deduping.DestinationInitialStatus
1212
import io.airbyte.integrations.base.destination.typing_deduping.Sql
@@ -19,14 +19,14 @@ class NoOpJdbcDestinationHandler<DestinationState>(
1919
jdbcDatabase: JdbcDatabase,
2020
rawTableSchemaName: String,
2121
sqlDialect: SQLDialect,
22-
sqlOperations: SqlOperations,
22+
generationHandler: JdbcGenerationHandler,
2323
) :
2424
JdbcDestinationHandler<DestinationState>(
2525
databaseName,
2626
jdbcDatabase,
2727
rawTableSchemaName,
2828
sqlDialect,
29-
sqlOperations = sqlOperations
29+
generationHandler = generationHandler,
3030
) {
3131

3232
override fun execute(sql: Sql) {

airbyte-cdk/java/airbyte-cdk/db-destinations/src/test/kotlin/io/airbyte/cdk/integrations/destination/jdbc/AbstractJdbcDestinationTest.kt

+2
Original file line numberDiff line numberDiff line change
@@ -153,6 +153,8 @@ class AbstractJdbcDestinationTest {
153153

154154
override fun getSqlOperations(config: JsonNode): SqlOperations = TestJdbcSqlOperations()
155155

156+
override fun getGenerationHandler(): JdbcGenerationHandler = mock()
157+
156158
override fun getDestinationHandler(
157159
config: JsonNode,
158160
databaseName: String,

airbyte-cdk/java/airbyte-cdk/db-destinations/src/test/kotlin/io/airbyte/cdk/integrations/destination/jdbc/TestJdbcSqlOperations.kt

-9
Original file line numberDiff line numberDiff line change
@@ -23,15 +23,6 @@ class TestJdbcSqlOperations : JdbcSqlOperations() {
2323
// Not required for the testing
2424
}
2525

26-
override fun getGenerationIdInTable(
27-
database: JdbcDatabase,
28-
rawNamespace: String,
29-
rawName: String
30-
): Long {
31-
return -1L
32-
// Not required for the testing
33-
}
34-
3526
override fun overwriteRawTable(database: JdbcDatabase, rawNamespace: String, rawName: String) {
3627
// Not required for the testing
3728
}

0 commit comments

Comments
 (0)