Skip to content

Commit f302124

Browse files
committed
CDK module changes for destination
1 parent 27c225a commit f302124

File tree

33 files changed

+141
-118
lines changed

33 files changed

+141
-118
lines changed

airbyte-cdk/java/airbyte-cdk/core/src/main/kotlin/io/airbyte/cdk/db/factory/ConnectionFactory.kt

+1
Original file line numberDiff line numberDiff line change
@@ -23,6 +23,7 @@ object ConnectionFactory {
2323
* @param jdbcConnectionString The JDBC connection string.
2424
* @return The configured [Connection]
2525
*/
26+
@JvmStatic
2627
fun create(
2728
username: String?,
2829
password: String?,

airbyte-cdk/java/airbyte-cdk/core/src/main/kotlin/io/airbyte/cdk/db/factory/DatabaseDriver.kt

+1-4
Original file line numberDiff line numberDiff line change
@@ -4,10 +4,7 @@
44
package io.airbyte.cdk.db.factory
55

66
/** Collection of JDBC driver class names and the associated JDBC URL format string. */
7-
enum class DatabaseDriver(
8-
@JvmField val driverClassName: String,
9-
@JvmField val urlFormatString: String
10-
) {
7+
enum class DatabaseDriver(val driverClassName: String, val urlFormatString: String) {
118
CLICKHOUSE("com.clickhouse.jdbc.ClickHouseDriver", "jdbc:clickhouse:%s://%s:%d/%s"),
129
DATABRICKS(
1310
"com.databricks.client.jdbc.Driver",

airbyte-cdk/java/airbyte-cdk/core/src/main/kotlin/io/airbyte/cdk/db/jdbc/AbstractJdbcCompatibleSourceOperations.kt

+9-4
Original file line numberDiff line numberDiff line change
@@ -213,15 +213,20 @@ abstract class AbstractJdbcCompatibleSourceOperations<Datatype> :
213213
}
214214

215215
@Throws(SQLException::class)
216-
protected fun putTime(node: ObjectNode, columnName: String?, resultSet: ResultSet, index: Int) {
216+
protected open fun putTime(
217+
node: ObjectNode,
218+
columnName: String?,
219+
resultSet: ResultSet,
220+
index: Int
221+
) {
217222
node.put(
218223
columnName,
219224
DateTimeConverter.convertToTime(getObject(resultSet, index, LocalTime::class.java))
220225
)
221226
}
222227

223228
@Throws(SQLException::class)
224-
protected fun putTimestamp(
229+
protected open fun putTimestamp(
225230
node: ObjectNode,
226231
columnName: String?,
227232
resultSet: ResultSet,
@@ -419,7 +424,7 @@ abstract class AbstractJdbcCompatibleSourceOperations<Datatype> :
419424
}
420425

421426
@Throws(SQLException::class)
422-
protected fun putTimeWithTimezone(
427+
protected open fun putTimeWithTimezone(
423428
node: ObjectNode,
424429
columnName: String?,
425430
resultSet: ResultSet,
@@ -430,7 +435,7 @@ abstract class AbstractJdbcCompatibleSourceOperations<Datatype> :
430435
}
431436

432437
@Throws(SQLException::class)
433-
protected fun putTimestampWithTimezone(
438+
protected open fun putTimestampWithTimezone(
434439
node: ObjectNode,
435440
columnName: String?,
436441
resultSet: ResultSet,

airbyte-cdk/java/airbyte-cdk/core/src/main/kotlin/io/airbyte/cdk/db/jdbc/JdbcSourceOperations.kt

+1-1
Original file line numberDiff line numberDiff line change
@@ -15,7 +15,7 @@ import org.slf4j.Logger
1515
import org.slf4j.LoggerFactory
1616

1717
/** Implementation of source operations with standard JDBC types. */
18-
class JdbcSourceOperations :
18+
open class JdbcSourceOperations :
1919
AbstractJdbcCompatibleSourceOperations<JDBCType>(), SourceOperations<ResultSet, JDBCType> {
2020
protected fun safeGetJdbcType(columnTypeInt: Int): JDBCType {
2121
return try {

airbyte-cdk/java/airbyte-cdk/core/src/main/kotlin/io/airbyte/cdk/integrations/base/ssh/SshTunnel.kt

+5
Original file line numberDiff line numberDiff line change
@@ -422,6 +422,7 @@ constructor(
422422

423423
const val TIMEOUT_MILLIS: Int = 15000 // 15 seconds
424424

425+
@JvmStatic
425426
fun getInstance(config: JsonNode, hostKey: List<String>, portKey: List<String>): SshTunnel {
426427
val tunnelMethod =
427428
Jsons.getOptional(config, "tunnel_method", "tunnel_method")
@@ -515,6 +516,7 @@ constructor(
515516
)
516517
}
517518

519+
@JvmStatic
518520
@Throws(Exception::class)
519521
fun sshWrap(
520522
config: JsonNode,
@@ -528,6 +530,7 @@ constructor(
528530
}
529531
}
530532

533+
@JvmStatic
531534
@Throws(Exception::class)
532535
fun sshWrap(
533536
config: JsonNode,
@@ -540,6 +543,7 @@ constructor(
540543
}
541544
}
542545

546+
@JvmStatic
543547
@Throws(Exception::class)
544548
fun <T> sshWrap(
545549
config: JsonNode,
@@ -552,6 +556,7 @@ constructor(
552556
}
553557
}
554558

559+
@JvmStatic
555560
@Throws(Exception::class)
556561
fun <T> sshWrap(
557562
config: JsonNode,

airbyte-cdk/java/airbyte-cdk/db-destinations/src/main/kotlin/io/airbyte/cdk/integrations/destination/jdbc/AbstractJdbcDestination.kt

+6-5
Original file line numberDiff line numberDiff line change
@@ -50,7 +50,7 @@ import org.slf4j.LoggerFactory
5050

5151
abstract class AbstractJdbcDestination<DestinationState : MinimumDestinationState>(
5252
driverClass: String,
53-
protected val namingResolver: NamingConventionTransformer,
53+
protected open val namingResolver: NamingConventionTransformer,
5454
protected val sqlOperations: SqlOperations
5555
) : JdbcConnector(driverClass), Destination {
5656
protected val configSchemaKey: String
@@ -106,14 +106,14 @@ abstract class AbstractJdbcDestination<DestinationState : MinimumDestinationStat
106106
* @throws Exception
107107
*/
108108
@Throws(Exception::class)
109-
protected fun destinationSpecificTableOperations(database: JdbcDatabase?) {}
109+
protected open fun destinationSpecificTableOperations(database: JdbcDatabase?) {}
110110

111111
/**
112112
* Subclasses which need to modify the DataSource should override [.modifyDataSourceBuilder]
113113
* rather than this method.
114114
*/
115115
@VisibleForTesting
116-
fun getDataSource(config: JsonNode): DataSource {
116+
open fun getDataSource(config: JsonNode): DataSource {
117117
val jdbcConfig = toJdbcConfig(config)
118118
val connectionProperties = getConnectionProperties(config)
119119
val builder =
@@ -137,7 +137,7 @@ abstract class AbstractJdbcDestination<DestinationState : MinimumDestinationStat
137137
}
138138

139139
@VisibleForTesting
140-
fun getDatabase(dataSource: DataSource): JdbcDatabase {
140+
open fun getDatabase(dataSource: DataSource): JdbcDatabase {
141141
return DefaultJdbcDatabase(dataSource)
142142
}
143143

@@ -195,7 +195,7 @@ abstract class AbstractJdbcDestination<DestinationState : MinimumDestinationStat
195195
return config[JdbcUtils.DATABASE_KEY].asText()
196196
}
197197

198-
protected fun getDataTransformer(
198+
protected open fun getDataTransformer(
199199
parsedCatalog: ParsedCatalog?,
200200
defaultNamespace: String?
201201
): StreamAwareDataTransformer {
@@ -343,6 +343,7 @@ abstract class AbstractJdbcDestination<DestinationState : MinimumDestinationStat
343343
* - set true if need to make attempt to insert dummy records to newly created table. Set
344344
* false to skip insert step.
345345
*/
346+
@JvmStatic
346347
@Throws(Exception::class)
347348
fun attemptTableOperations(
348349
outputSchema: String?,

airbyte-cdk/java/airbyte-cdk/db-destinations/src/main/kotlin/io/airbyte/cdk/integrations/destination/jdbc/JdbcSqlOperations.kt

+2-2
Original file line numberDiff line numberDiff line change
@@ -86,7 +86,7 @@ abstract class JdbcSqlOperations : SqlOperations {
8686
return listOf()
8787
}
8888

89-
protected fun createTableQueryV1(schemaName: String?, tableName: String?): String {
89+
protected open fun createTableQueryV1(schemaName: String?, tableName: String?): String {
9090
return String.format(
9191
"""
9292
CREATE TABLE IF NOT EXISTS %s.%s (
@@ -104,7 +104,7 @@ abstract class JdbcSqlOperations : SqlOperations {
104104
)
105105
}
106106

107-
protected fun createTableQueryV2(schemaName: String?, tableName: String?): String {
107+
protected open fun createTableQueryV2(schemaName: String?, tableName: String?): String {
108108
// Note that Meta is the last column in order, there was a time when tables didn't have
109109
// meta,
110110
// we issued Alter to add that column so it should be the last column.

airbyte-cdk/java/airbyte-cdk/db-destinations/src/main/kotlin/io/airbyte/cdk/integrations/destination/jdbc/SqlOperationsUtils.kt

+1
Original file line numberDiff line numberDiff line change
@@ -29,6 +29,7 @@ object SqlOperationsUtils {
2929
* @param records records to write
3030
* @throws SQLException exception
3131
*/
32+
@JvmStatic
3233
@Throws(SQLException::class)
3334
fun insertRawRecordsInSingleQuery(
3435
insertQueryComponent: String?,

airbyte-cdk/java/airbyte-cdk/db-destinations/src/main/kotlin/io/airbyte/cdk/integrations/destination/jdbc/copy/SwitchingDestination.kt

+1-1
Original file line numberDiff line numberDiff line change
@@ -28,7 +28,7 @@ import org.slf4j.LoggerFactory
2828
* This class exists to make it easy to define a destination in terms of multiple other destination
2929
* implementations, switching between them based on the config provided.
3030
*/
31-
class SwitchingDestination<T : Enum<T>>(
31+
open class SwitchingDestination<T : Enum<T>>(
3232
enumClass: Class<T>,
3333
configToType: Function<JsonNode, T>,
3434
typeToDestination: Map<T, Destination>

airbyte-cdk/java/airbyte-cdk/db-destinations/src/main/kotlin/io/airbyte/cdk/integrations/destination/jdbc/typing_deduping/JdbcDestinationHandler.kt

+4-7
Original file line numberDiff line numberDiff line change
@@ -263,24 +263,21 @@ abstract class JdbcDestinationHandler<DestinationState>(
263263
// We can't directly call record.set here, because that will raise a
264264
// ConcurrentModificationException on the fieldnames iterator.
265265
// Instead, build up a map of new fields and set them all at once.
266-
newFields.put(fieldName.lowercase(Locale.getDefault()), record[fieldName])
266+
newFields[fieldName.lowercase(Locale.getDefault())] = record[fieldName]
267267
}
268268

269269
record.setAll<JsonNode>(newFields)
270270
}
271271
.collect(
272272
toMap(
273273
{ record ->
274-
val nameNode: JsonNode = record.get(DESTINATION_STATE_TABLE_COLUMN_NAME)
275-
val namespaceNode: JsonNode =
276-
record.get(DESTINATION_STATE_TABLE_COLUMN_NAMESPACE)
277274
AirbyteStreamNameNamespacePair(
278-
if (nameNode != null) nameNode.asText() else null,
279-
if (namespaceNode != null) namespaceNode.asText() else null
275+
record.get(DESTINATION_STATE_TABLE_COLUMN_NAME)?.asText(),
276+
record.get(DESTINATION_STATE_TABLE_COLUMN_NAMESPACE)?.asText()
280277
)
281278
},
282279
{ record ->
283-
val stateNode: JsonNode =
280+
val stateNode: JsonNode? =
284281
record.get(DESTINATION_STATE_TABLE_COLUMN_STATE)
285282
val state =
286283
if (stateNode != null) Jsons.deserialize(stateNode.asText())

airbyte-cdk/java/airbyte-cdk/db-destinations/src/main/kotlin/io/airbyte/cdk/integrations/destination/jdbc/typing_deduping/JdbcSqlGenerator.kt

+3-3
Original file line numberDiff line numberDiff line change
@@ -144,7 +144,7 @@ abstract class JdbcSqlGenerator(protected val namingTransformer: NamingConventio
144144
cursorField: Optional<ColumnId>
145145
): Field<Int>
146146

147-
protected val dslContext: DSLContext
147+
protected open val dslContext: DSLContext
148148
get() = DSL.using(dialect)
149149

150150
/**
@@ -596,7 +596,7 @@ abstract class JdbcSqlGenerator(protected val namingTransformer: NamingConventio
596596
.getSQL(ParamType.INLINED)
597597
}
598598

599-
protected fun castedField(
599+
protected open fun castedField(
600600
field: Field<*>?,
601601
type: AirbyteType,
602602
alias: String?,
@@ -625,7 +625,7 @@ abstract class JdbcSqlGenerator(protected val namingTransformer: NamingConventio
625625
return DSL.cast(field, toDialectType(type))
626626
}
627627

628-
protected fun currentTimestamp(): Field<Timestamp> {
628+
protected open fun currentTimestamp(): Field<Timestamp> {
629629
return DSL.currentTimestamp()
630630
}
631631

airbyte-cdk/java/airbyte-cdk/db-destinations/src/testFixtures/kotlin/io/airbyte/cdk/integrations/standardtest/destination/DestinationAcceptanceTest.kt

+6-6
Original file line numberDiff line numberDiff line change
@@ -90,7 +90,7 @@ abstract class DestinationAcceptanceTest {
9090
protected var localRoot: Path? = null
9191
open protected var _testDataComparator: TestDataComparator = getTestDataComparator()
9292

93-
open fun getTestDataComparator(): TestDataComparator {
93+
protected open fun getTestDataComparator(): TestDataComparator {
9494
return BasicTestDataComparator { this.resolveIdentifier(it) }
9595
}
9696

@@ -102,7 +102,7 @@ abstract class DestinationAcceptanceTest {
102102
*/
103103
get
104104

105-
protected fun supportsInDestinationNormalization(): Boolean {
105+
protected open fun supportsInDestinationNormalization(): Boolean {
106106
return false
107107
}
108108

@@ -208,7 +208,7 @@ abstract class DestinationAcceptanceTest {
208208
/**
209209
* Override to return true if a destination implements namespaces and should be tested as such.
210210
*/
211-
protected fun implementsNamespaces(): Boolean {
211+
protected open fun implementsNamespaces(): Boolean {
212212
return false
213213
}
214214

@@ -308,7 +308,7 @@ abstract class DestinationAcceptanceTest {
308308
* - can throw any exception, test framework will handle.
309309
*/
310310
@Throws(Exception::class)
311-
protected fun retrieveNormalizedRecords(
311+
protected open fun retrieveNormalizedRecords(
312312
testEnv: TestDestinationEnv?,
313313
streamName: String?,
314314
namespace: String?
@@ -1036,7 +1036,7 @@ abstract class DestinationAcceptanceTest {
10361036
}
10371037
}
10381038

1039-
protected val maxRecordValueLimit: Int
1039+
protected open val maxRecordValueLimit: Int
10401040
/** @return the max limit length allowed for values in the destination. */
10411041
get() = 1000000000
10421042

@@ -1953,7 +1953,7 @@ abstract class DestinationAcceptanceTest {
19531953
return false
19541954
}
19551955

1956-
protected fun supportIncrementalSchemaChanges(): Boolean {
1956+
protected open fun supportIncrementalSchemaChanges(): Boolean {
19571957
return false
19581958
}
19591959

airbyte-cdk/java/airbyte-cdk/db-destinations/src/testFixtures/kotlin/io/airbyte/cdk/integrations/standardtest/destination/TestingNamespaces.kt

+2
Original file line numberDiff line numberDiff line change
@@ -38,6 +38,7 @@ object TestingNamespaces {
3838
*
3939
* @return convention-compliant namespace
4040
*/
41+
@JvmStatic
4142
@JvmOverloads
4243
fun generate(prefix: String? = null): String {
4344
val userDefinedPrefix = if (prefix != null) prefix + "_" else ""
@@ -58,6 +59,7 @@ object TestingNamespaces {
5859
* @param namespace to check
5960
* @return true if the namespace is older than 2 days, otherwise false
6061
*/
62+
@JvmStatic
6163
fun isOlderThan2Days(namespace: String): Boolean {
6264
return isOlderThan(namespace, 2, ChronoUnit.DAYS)
6365
}

airbyte-cdk/java/airbyte-cdk/db-destinations/src/testFixtures/kotlin/io/airbyte/cdk/integrations/standardtest/destination/comparator/AdvancedTestDataComparator.kt

+2-2
Original file line numberDiff line numberDiff line change
@@ -24,7 +24,7 @@ open class AdvancedTestDataComparator : TestDataComparator {
2424
}
2525
}
2626

27-
protected fun resolveIdentifier(identifier: String?): List<String?> {
27+
protected open fun resolveIdentifier(identifier: String?): List<String?> {
2828
return java.util.List.of(identifier)
2929
}
3030

@@ -174,7 +174,7 @@ open class AdvancedTestDataComparator : TestDataComparator {
174174
.withZoneSameInstant(ZoneOffset.UTC)
175175
}
176176

177-
protected fun compareDateTimeWithTzValues(
177+
protected open fun compareDateTimeWithTzValues(
178178
airbyteMessageValue: String,
179179
destinationValue: String
180180
): Boolean {

airbyte-cdk/java/airbyte-cdk/db-destinations/src/testFixtures/kotlin/io/airbyte/cdk/integrations/standardtest/destination/typing_deduping/JdbcTypingDedupingTest.kt

+2-2
Original file line numberDiff line numberDiff line change
@@ -36,15 +36,15 @@ abstract class JdbcTypingDedupingTest : BaseTypingDedupingTest() {
3636

3737
protected abstract fun getDataSource(config: JsonNode?): DataSource?
3838

39-
protected val sourceOperations: JdbcCompatibleSourceOperations<*>
39+
protected open val sourceOperations: JdbcCompatibleSourceOperations<*>
4040
/**
4141
* Subclasses may need to return a custom source operations if the default one does not
4242
* handle vendor-specific types correctly. For example, you most likely need to override
4343
* this method to deserialize JSON columns to JsonNode.
4444
*/
4545
get() = JdbcUtils.defaultSourceOperations
4646

47-
protected val rawSchema: String
47+
protected open val rawSchema: String
4848
/**
4949
* Subclasses using a config with a nonstandard raw table schema should override this
5050
* method.

0 commit comments

Comments
 (0)