Skip to content

Commit 6372cf6

Browse files
convert #36432 to kotlin 2/2
1 parent ce5d636 commit 6372cf6

File tree

6 files changed

+119
-82
lines changed

6 files changed

+119
-82
lines changed

airbyte-cdk/java/airbyte-cdk/core/src/main/kotlin/io/airbyte/cdk/db/jdbc/AbstractJdbcCompatibleSourceOperations.kt

Lines changed: 3 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -23,6 +23,9 @@ import java.util.*
2323
import org.slf4j.Logger
2424
import org.slf4j.LoggerFactory
2525

26+
import org.slf4j.Logger
27+
import org.slf4j.LoggerFactory
28+
2629
/** Source operation skeleton for JDBC compatible databases. */
2730
abstract class AbstractJdbcCompatibleSourceOperations<Datatype> :
2831
JdbcCompatibleSourceOperations<Datatype> {
Original file line numberDiff line numberDiff line change
@@ -0,0 +1 @@
1+
Original file line numberDiff line numberDiff line change
@@ -0,0 +1 @@
1+

airbyte-cdk/java/airbyte-cdk/db-sources/src/main/kotlin/io/airbyte/cdk/integrations/debezium/CdcMetadataInjector.kt

Lines changed: 4 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -30,6 +30,10 @@ interface CdcMetadataInjector<T> {
3030
throw RuntimeException("Not Supported")
3131
}
3232

33+
fun addMetaDataToRowsFetchedOutsideDebezium(record: ObjectNode?) {
34+
throw java.lang.RuntimeException("Not Supported")
35+
}
36+
3337
/**
3438
* As part of Airbyte record we need to add the namespace (schema name)
3539
*

airbyte-cdk/java/airbyte-cdk/db-sources/src/main/kotlin/io/airbyte/cdk/integrations/source/jdbc/AbstractJdbcSource.kt

Lines changed: 89 additions & 71 deletions
Original file line numberDiff line numberDiff line change
@@ -14,6 +14,7 @@ import io.airbyte.cdk.db.JdbcCompatibleSourceOperations
1414
import io.airbyte.cdk.db.SqlDatabase
1515
import io.airbyte.cdk.db.factory.DataSourceFactory.close
1616
import io.airbyte.cdk.db.factory.DataSourceFactory.create
17+
import io.airbyte.cdk.db.jdbc.AirbyteRecordData
1718
import io.airbyte.cdk.db.jdbc.JdbcConstants.INTERNAL_COLUMN_NAME
1819
import io.airbyte.cdk.db.jdbc.JdbcConstants.INTERNAL_COLUMN_SIZE
1920
import io.airbyte.cdk.db.jdbc.JdbcConstants.INTERNAL_COLUMN_TYPE
@@ -42,6 +43,7 @@ import io.airbyte.cdk.integrations.source.jdbc.dto.JdbcPrivilegeDto
4243
import io.airbyte.cdk.integrations.source.relationaldb.AbstractDbSource
4344
import io.airbyte.cdk.integrations.source.relationaldb.CursorInfo
4445
import io.airbyte.cdk.integrations.source.relationaldb.RelationalDbQueryUtils
46+
import io.airbyte.cdk.integrations.source.relationaldb.RelationalDbQueryUtils.enquoteIdentifier
4547
import io.airbyte.cdk.integrations.source.relationaldb.TableInfo
4648
import io.airbyte.cdk.integrations.source.relationaldb.state.StateManager
4749
import io.airbyte.commons.functional.CheckedConsumer
@@ -100,50 +102,73 @@ abstract class AbstractJdbcSource<Datatype>(
100102
tableName: String,
101103
syncMode: SyncMode,
102104
cursorField: Optional<String>
103-
): AutoCloseableIterator<JsonNode>? {
104-
LOGGER.info("Queueing query for table: {}", tableName)
105-
val quoteString = this.quoteString!!
106-
// This corresponds to the initial sync for in INCREMENTAL_MODE, where the ordering of the
107-
// records
108-
// matters
109-
// as intermediate state messages are emitted (if the connector emits intermediate state).
110-
if (syncMode == SyncMode.INCREMENTAL && stateEmissionFrequency > 0) {
111-
val quotedCursorField =
112-
RelationalDbQueryUtils.enquoteIdentifier(cursorField.get(), quoteString)
113-
return RelationalDbQueryUtils.queryTable(
114-
database,
115-
String.format(
116-
"SELECT %s FROM %s ORDER BY %s ASC",
117-
RelationalDbQueryUtils.enquoteIdentifierList(columnNames, quoteString),
118-
RelationalDbQueryUtils.getFullyQualifiedTableNameWithQuoting(
119-
schemaName,
120-
tableName,
121-
quoteString
122-
),
123-
quotedCursorField
124-
),
125-
tableName,
126-
schemaName
127-
)
128-
} else {
129-
// If we are in FULL_REFRESH mode, state messages are never emitted, so we don't care
130-
// about ordering
131-
// of the records.
132-
return RelationalDbQueryUtils.queryTable(
133-
database,
134-
String.format(
135-
"SELECT %s FROM %s",
136-
RelationalDbQueryUtils.enquoteIdentifierList(columnNames, quoteString),
137-
RelationalDbQueryUtils.getFullyQualifiedTableNameWithQuoting(
138-
schemaName,
139-
tableName,
140-
quoteString
105+
): AutoCloseableIterator<AirbyteRecordData> {
106+
AbstractDbSource.LOGGER.info("Queueing query for table: {}", tableName)
107+
val airbyteStream = AirbyteStreamUtils.convertFromNameAndNamespace(tableName, schemaName)
108+
return AutoCloseableIterators.lazyIterator<AirbyteRecordData>(
109+
Supplier<AutoCloseableIterator<AirbyteRecordData>> {
110+
try {
111+
val stream =
112+
database.unsafeQuery(
113+
{ connection: Connection ->
114+
AbstractDbSource.LOGGER.info(
115+
"Preparing query for table: {}",
116+
tableName
117+
)
118+
val fullTableName: String =
119+
RelationalDbQueryUtils.getFullyQualifiedTableNameWithQuoting(
120+
schemaName,
121+
tableName,
122+
quoteString!!
123+
)
124+
125+
val wrappedColumnNames =
126+
getWrappedColumnNames(
127+
database,
128+
connection,
129+
columnNames,
130+
schemaName,
131+
tableName
132+
)
133+
val sql =
134+
java.lang.StringBuilder(
135+
String.format(
136+
"SELECT %s FROM %s",
137+
wrappedColumnNames,
138+
fullTableName
139+
)
140+
)
141+
// if the connector emits intermediate states, the incremental query
142+
// must be sorted by the cursor
143+
// field
144+
if (
145+
syncMode == SyncMode.INCREMENTAL && stateEmissionFrequency > 0
146+
) {
147+
val quotedCursorField: String =
148+
enquoteIdentifier(cursorField.get(), quoteString)
149+
sql.append(String.format(" ORDER BY %s ASC", quotedCursorField))
150+
}
151+
152+
val preparedStatement = connection.prepareStatement(sql.toString())
153+
AbstractDbSource.LOGGER.info(
154+
"Executing query for table {}: {}",
155+
tableName,
156+
preparedStatement
157+
)
158+
preparedStatement
159+
},
160+
sourceOperations::convertDatabaseRowToAirbyteRecordData
161+
)
162+
return@Supplier AutoCloseableIterators.fromStream<AirbyteRecordData>(
163+
stream,
164+
airbyteStream
141165
)
142-
),
143-
tableName,
144-
schemaName
145-
)
146-
}
166+
} catch (e: SQLException) {
167+
throw java.lang.RuntimeException(e)
168+
}
169+
},
170+
airbyteStream
171+
)
147172
}
148173

149174
/**
@@ -433,37 +458,34 @@ abstract class AbstractJdbcSource<Datatype>(
433458
return sourceOperations.isCursorType(type)
434459
}
435460

436-
public override fun queryTableIncremental(
461+
override fun queryTableIncremental(
437462
database: JdbcDatabase,
438463
columnNames: List<String>,
439464
schemaName: String?,
440465
tableName: String,
441466
cursorInfo: CursorInfo,
442467
cursorFieldType: Datatype
443-
): AutoCloseableIterator<JsonNode>? {
444-
LOGGER.info("Queueing query for table: {}", tableName)
468+
): AutoCloseableIterator<AirbyteRecordData> {
469+
AbstractDbSource.LOGGER.info("Queueing query for table: {}", tableName)
445470
val airbyteStream = AirbyteStreamUtils.convertFromNameAndNamespace(tableName, schemaName)
446471
return AutoCloseableIterators.lazyIterator(
447472
{
448-
val quoteString = this.quoteString!!
449473
try {
450474
val stream =
451475
database.unsafeQuery(
452-
CheckedFunction<Connection, PreparedStatement, SQLException?> {
453-
connection: Connection ->
454-
LOGGER.info("Preparing query for table: {}", tableName)
455-
val fullTableName =
476+
{ connection: Connection ->
477+
AbstractDbSource.LOGGER.info(
478+
"Preparing query for table: {}",
479+
tableName
480+
)
481+
val fullTableName: String =
456482
RelationalDbQueryUtils.getFullyQualifiedTableNameWithQuoting(
457483
schemaName,
458484
tableName,
459-
quoteString
460-
)
461-
val quotedCursorField =
462-
RelationalDbQueryUtils.enquoteIdentifier(
463-
cursorInfo.cursorField,
464-
quoteString
485+
quoteString!!
465486
)
466-
487+
val quotedCursorField: String =
488+
enquoteIdentifier(cursorInfo.cursorField, quoteString)
467489
val operator: String
468490
if (cursorInfo.cursorRecordCount <= 0L) {
469491
operator = ">"
@@ -476,7 +498,7 @@ abstract class AbstractJdbcSource<Datatype>(
476498
cursorFieldType,
477499
cursorInfo.cursor
478500
)
479-
LOGGER.info(
501+
AbstractDbSource.LOGGER.info(
480502
"Table {} cursor count: expected {}, actual {}",
481503
tableName,
482504
cursorInfo.cursorRecordCount,
@@ -489,7 +511,6 @@ abstract class AbstractJdbcSource<Datatype>(
489511
">="
490512
}
491513
}
492-
493514
val wrappedColumnNames =
494515
getWrappedColumnNames(
495516
database,
@@ -514,9 +535,8 @@ abstract class AbstractJdbcSource<Datatype>(
514535
if (stateEmissionFrequency > 0) {
515536
sql.append(String.format(" ORDER BY %s ASC", quotedCursorField))
516537
}
517-
518538
val preparedStatement = connection.prepareStatement(sql.toString())
519-
LOGGER.info(
539+
AbstractDbSource.LOGGER.info(
520540
"Executing query for table {}: {}",
521541
tableName,
522542
preparedStatement
@@ -529,12 +549,9 @@ abstract class AbstractJdbcSource<Datatype>(
529549
)
530550
preparedStatement
531551
},
532-
CheckedFunction<ResultSet, JsonNode, SQLException?> {
533-
queryResult: ResultSet? ->
534-
sourceOperations.rowToJson(queryResult!!)
535-
}
552+
sourceOperations::convertDatabaseRowToAirbyteRecordData
536553
)
537-
return@lazyIterator AutoCloseableIterators.fromStream<JsonNode>(
554+
return@lazyIterator AutoCloseableIterators.fromStream<AirbyteRecordData>(
538555
stream,
539556
airbyteStream
540557
)
@@ -546,6 +563,10 @@ abstract class AbstractJdbcSource<Datatype>(
546563
)
547564
}
548565

566+
protected fun getCountColumnName(): String {
567+
return "record_count"
568+
}
569+
549570
/** Some databases need special column names in the query. */
550571
@Throws(SQLException::class)
551572
protected fun getWrappedColumnNames(
@@ -558,9 +579,6 @@ abstract class AbstractJdbcSource<Datatype>(
558579
return RelationalDbQueryUtils.enquoteIdentifierList(columnNames, quoteString!!)
559580
}
560581

561-
protected val countColumnName: String
562-
get() = "record_count"
563-
564582
@Throws(SQLException::class)
565583
protected fun getActualCursorRecordCount(
566584
connection: Connection,
@@ -569,7 +587,7 @@ abstract class AbstractJdbcSource<Datatype>(
569587
cursorFieldType: Datatype,
570588
cursor: String?
571589
): Long {
572-
val columnName = countColumnName
590+
val columnName = getCountColumnName()
573591
val cursorRecordStatement: PreparedStatement
574592
if (cursor == null) {
575593
val cursorRecordQuery =

airbyte-cdk/java/airbyte-cdk/db-sources/src/main/kotlin/io/airbyte/cdk/integrations/source/relationaldb/AbstractDbSource.kt

Lines changed: 21 additions & 11 deletions
Original file line numberDiff line numberDiff line change
@@ -10,6 +10,7 @@ import io.airbyte.cdk.db.AbstractDatabase
1010
import io.airbyte.cdk.db.IncrementalUtils.getCursorField
1111
import io.airbyte.cdk.db.IncrementalUtils.getCursorFieldOptional
1212
import io.airbyte.cdk.db.IncrementalUtils.getCursorType
13+
import io.airbyte.cdk.db.jdbc.AirbyteRecordData
1314
import io.airbyte.cdk.db.jdbc.JdbcDatabase
1415
import io.airbyte.cdk.integrations.JdbcConnector
1516
import io.airbyte.cdk.integrations.base.AirbyteTraceMessageUtility
@@ -416,14 +417,14 @@ protected constructor(driverClassName: String) :
416417
.filter { o: String -> selectedFieldsInCatalog.contains(o) }
417418
.collect(Collectors.toList())
418419

419-
val iterator: AutoCloseableIterator<AirbyteMessage?>
420+
val iterator: AutoCloseableIterator<AirbyteMessage>
420421
// checks for which sync mode we're using based on the configured airbytestream
421422
// this is where the bifurcation between full refresh and incremental
422423
if (airbyteStream.syncMode == SyncMode.INCREMENTAL) {
423424
val cursorField = getCursorField(airbyteStream)
424425
val cursorInfo = stateManager!!.getCursorInfo(pair)
425426

426-
val airbyteMessageIterator: AutoCloseableIterator<AirbyteMessage?>
427+
val airbyteMessageIterator: AutoCloseableIterator<AirbyteMessage>
427428
if (cursorInfo!!.map { it.cursor }.isPresent) {
428429
airbyteMessageIterator =
429430
getIncrementalStream(
@@ -525,7 +526,7 @@ protected constructor(driverClassName: String) :
525526
table: TableInfo<CommonField<DataType>>,
526527
cursorInfo: CursorInfo,
527528
emittedAt: Instant
528-
): AutoCloseableIterator<AirbyteMessage?> {
529+
): AutoCloseableIterator<AirbyteMessage> {
529530
val streamName = airbyteStream.stream.name
530531
val namespace = airbyteStream.stream.namespace
531532
val cursorField = getCursorField(airbyteStream)
@@ -577,7 +578,7 @@ protected constructor(driverClassName: String) :
577578
emittedAt: Instant,
578579
syncMode: SyncMode,
579580
cursorField: Optional<String>
580-
): AutoCloseableIterator<AirbyteMessage?> {
581+
): AutoCloseableIterator<AirbyteMessage> {
581582
val queryStream =
582583
queryTableFullRefresh(
583584
database,
@@ -745,7 +746,7 @@ protected constructor(driverClassName: String) :
745746
tableName: String,
746747
syncMode: SyncMode,
747748
cursorField: Optional<String>
748-
): AutoCloseableIterator<JsonNode>?
749+
): AutoCloseableIterator<AirbyteRecordData>
749750

750751
/**
751752
* Read incremental data from a table. Incremental read should return only records where cursor
@@ -762,7 +763,7 @@ protected constructor(driverClassName: String) :
762763
tableName: String,
763764
cursorInfo: CursorInfo,
764765
cursorFieldType: DataType
765-
): AutoCloseableIterator<JsonNode>?
766+
): AutoCloseableIterator<AirbyteRecordData>
766767

767768
protected val stateEmissionFrequency: Int
768769
/**
@@ -794,28 +795,37 @@ protected constructor(driverClassName: String) :
794795
const val DISCOVER_TRACE_OPERATION_NAME: String = "discover-operation"
795796
const val READ_TRACE_OPERATION_NAME: String = "read-operation"
796797

797-
private val LOGGER: Logger = LoggerFactory.getLogger(AbstractDbSource::class.java)
798+
@JvmStatic
799+
protected val LOGGER: Logger = LoggerFactory.getLogger(AbstractDbSource::class.java)
798800

799801
private fun getMessageIterator(
800-
recordIterator: AutoCloseableIterator<JsonNode>?,
802+
recordIterator: AutoCloseableIterator<AirbyteRecordData>,
801803
streamName: String,
802804
namespace: String,
803805
emittedAt: Long
804-
): AutoCloseableIterator<AirbyteMessage?> {
806+
): AutoCloseableIterator<AirbyteMessage> {
805807
return AutoCloseableIterators.transform(
806808
recordIterator,
807809
AirbyteStreamNameNamespacePair(streamName, namespace)
808-
) { r: JsonNode? ->
810+
) { airbyteRecordData ->
809811
AirbyteMessage()
810812
.withType(AirbyteMessage.Type.RECORD)
811813
.withRecord(
812814
AirbyteRecordMessage()
813815
.withStream(streamName)
814816
.withNamespace(namespace)
815817
.withEmittedAt(emittedAt)
816-
.withData(r)
818+
.withData(airbyteRecordData.rawRowData)
819+
.withMeta(
820+
if (isMetaChangesEmptyOrNull(airbyteRecordData.meta)) null
821+
else airbyteRecordData.meta
822+
)
817823
)
818824
}
819825
}
826+
827+
private fun isMetaChangesEmptyOrNull(meta: AirbyteRecordMessageMeta?): Boolean {
828+
return meta == null || meta.changes == null || meta.changes.isEmpty()
829+
}
820830
}
821831
}

0 commit comments

Comments
 (0)