Skip to content

Commit 75c80ce

Browse files
convert #36333 to kotlin
1 parent b618796 commit 75c80ce

File tree

14 files changed

+95
-264
lines changed

14 files changed

+95
-264
lines changed

airbyte-cdk/java/airbyte-cdk/core/src/main/kotlin/io/airbyte/cdk/db/jdbc/AbstractJdbcCompatibleSourceOperations.kt

-3
Original file line numberDiff line numberDiff line change
@@ -23,9 +23,6 @@ import java.util.*
2323
import org.slf4j.Logger
2424
import org.slf4j.LoggerFactory
2525

26-
import org.slf4j.Logger
27-
import org.slf4j.LoggerFactory
28-
2926
/** Source operation skeleton for JDBC compatible databases. */
3027
abstract class AbstractJdbcCompatibleSourceOperations<Datatype> :
3128
JdbcCompatibleSourceOperations<Datatype> {

airbyte-cdk/java/airbyte-cdk/core/src/main/kotlin/io/airbyte/cdk/db/jdbc/JdbcSourceOperations.kt

+1-1
Original file line numberDiff line numberDiff line change
@@ -65,7 +65,7 @@ class JdbcSourceOperations :
6565
preparedStatement: PreparedStatement,
6666
parameterIndex: Int,
6767
cursorFieldType: JDBCType?,
68-
value: String?
68+
value: String
6969
) {
7070
when (cursorFieldType) {
7171
JDBCType.TIMESTAMP -> setTimestamp(preparedStatement, parameterIndex, value)
Original file line numberDiff line numberDiff line change
@@ -1 +1,3 @@
1-
1+
/*
2+
* Copyright (c) 2024 Airbyte, Inc., all rights reserved.
3+
*/
Original file line numberDiff line numberDiff line change
@@ -1 +1,3 @@
1-
1+
/*
2+
* Copyright (c) 2024 Airbyte, Inc., all rights reserved.
3+
*/

airbyte-cdk/java/airbyte-cdk/db-sources/src/main/kotlin/io/airbyte/cdk/integrations/debezium/AirbyteDebeziumHandler.kt

+15-7
Original file line numberDiff line numberDiff line change
@@ -16,13 +16,13 @@ import io.airbyte.protocol.models.v0.ConfiguredAirbyteStream
1616
import io.airbyte.protocol.models.v0.SyncMode
1717
import io.debezium.engine.ChangeEvent
1818
import io.debezium.engine.DebeziumEngine
19-
import org.slf4j.Logger
20-
import org.slf4j.LoggerFactory
2119
import java.time.Duration
2220
import java.time.Instant
2321
import java.time.temporal.ChronoUnit
2422
import java.util.*
2523
import java.util.concurrent.LinkedBlockingQueue
24+
import org.slf4j.Logger
25+
import org.slf4j.LoggerFactory
2626

2727
/**
2828
* This class acts as the bridge between Airbyte DB connectors and debezium. If a DB connector wants
@@ -121,17 +121,25 @@ class AirbyteDebeziumHandler<T>(
121121
config[DebeziumIteratorConstants.SYNC_CHECKPOINT_RECORDS_PROPERTY].asLong()
122122
else DebeziumIteratorConstants.SYNC_CHECKPOINT_RECORDS.toLong()
123123

124-
val messageProducer: DebeziumMessageProducer<T> = DebeziumMessageProducer<T>(cdcStateHandler,
124+
val messageProducer: DebeziumMessageProducer<T> =
125+
DebeziumMessageProducer<T>(
126+
cdcStateHandler,
125127
targetPosition,
126128
eventConverter,
127129
offsetManager,
128-
schemaHistoryManager)
129-
130+
schemaHistoryManager
131+
)
130132

131-
// Usually sourceStateIterator requires airbyteStream as input. For DBZ iterator, stream is not used
133+
// Usually sourceStateIterator requires airbyteStream as input. For DBZ iterator, stream is
134+
// not used
132135
// at all thus we will pass in null.
133136
val iterator: SourceStateIterator<ChangeEventWithMetadata> =
134-
SourceStateIterator<ChangeEventWithMetadata>(eventIterator, null, messageProducer!!, StateEmitFrequency(syncCheckpointRecords, syncCheckpointDuration))
137+
SourceStateIterator<ChangeEventWithMetadata>(
138+
eventIterator,
139+
null,
140+
messageProducer!!,
141+
StateEmitFrequency(syncCheckpointRecords, syncCheckpointDuration)
142+
)
135143
return AutoCloseableIterators.fromIterator<AirbyteMessage>(iterator)
136144
}
137145

airbyte-cdk/java/airbyte-cdk/db-sources/src/main/kotlin/io/airbyte/cdk/integrations/debezium/CdcTargetPosition.kt

+1-4
Original file line numberDiff line numberDiff line change
@@ -57,10 +57,7 @@ interface CdcTargetPosition<T> {
5757
* @param event Event from the CDC load
5858
* @return Returns `true` when the event is ahead of the offset. Otherwise, it returns `false`
5959
*/
60-
fun isEventAheadOffset(
61-
offset: Map<String, String>?,
62-
event: ChangeEventWithMetadata?
63-
): Boolean {
60+
fun isEventAheadOffset(offset: Map<String, String>?, event: ChangeEventWithMetadata?): Boolean {
6461
return false
6562
}
6663

airbyte-cdk/java/airbyte-cdk/db-sources/src/main/kotlin/io/airbyte/cdk/integrations/debezium/internals/DebeziumMessageProducer.kt

+33-17
Original file line numberDiff line numberDiff line change
@@ -9,31 +9,33 @@ import io.airbyte.cdk.integrations.source.relationaldb.state.SourceStateMessageP
99
import io.airbyte.protocol.models.v0.AirbyteMessage
1010
import io.airbyte.protocol.models.v0.AirbyteStateMessage
1111
import io.airbyte.protocol.models.v0.ConfiguredAirbyteStream
12+
import java.util.*
1213
import org.apache.kafka.connect.errors.ConnectException
1314
import org.slf4j.Logger
1415
import org.slf4j.LoggerFactory
15-
import java.util.*
1616

17-
class DebeziumMessageProducer<T>(private val cdcStateHandler: CdcStateHandler,
18-
targetPosition: CdcTargetPosition<T>,
19-
eventConverter: DebeziumEventConverter,
20-
offsetManager: AirbyteFileOffsetBackingStore?,
21-
schemaHistoryManager: Optional<AirbyteSchemaHistoryStorage>) : SourceStateMessageProducer<ChangeEventWithMetadata> {
17+
class DebeziumMessageProducer<T>(
18+
private val cdcStateHandler: CdcStateHandler,
19+
targetPosition: CdcTargetPosition<T>,
20+
eventConverter: DebeziumEventConverter,
21+
offsetManager: AirbyteFileOffsetBackingStore?,
22+
schemaHistoryManager: Optional<AirbyteSchemaHistoryStorage>
23+
) : SourceStateMessageProducer<ChangeEventWithMetadata> {
2224
/**
2325
* `checkpointOffsetToSend` is used as temporal storage for the offset that we want to send as
2426
* message. As Debezium is reading records faster that we process them, if we try to send
25-
* `offsetManger.read()` offset, it is possible that the state is behind the record we are currently
26-
* propagating. To avoid that, we store the offset as soon as we reach the checkpoint threshold
27-
* (time or records) and we wait to send it until we are sure that the record we are processing is
28-
* behind the offset to be sent.
27+
* `offsetManger.read()` offset, it is possible that the state is behind the record we are
28+
* currently propagating. To avoid that, we store the offset as soon as we reach the checkpoint
29+
* threshold (time or records) and we wait to send it until we are sure that the record we are
30+
* processing is behind the offset to be sent.
2931
*/
3032
private val checkpointOffsetToSend = HashMap<String, String>()
3133

3234
/**
3335
* `previousCheckpointOffset` is used to make sure we don't send duplicated states with the same
34-
* offset. Is it possible that the offset Debezium report doesn't move for a period of time, and if
35-
* we just rely on the `offsetManger.read()`, there is a chance to sent duplicate states, generating
36-
* an unneeded usage of networking and processing.
36+
* offset. Is it possible that the offset Debezium report doesn't move for a period of time, and
37+
* if we just rely on the `offsetManger.read()`, there is a chance to sent duplicate states,
38+
* generating an unneeded usage of networking and processing.
3739
*/
3840
private val initialOffset: HashMap<String, String>
3941
private val previousCheckpointOffset: HashMap<String?, String?>
@@ -57,7 +59,9 @@ class DebeziumMessageProducer<T>(private val cdcStateHandler: CdcStateHandler,
5759
this.initialOffset = HashMap(this.previousCheckpointOffset)
5860
}
5961

60-
override fun generateStateMessageAtCheckpoint(stream: ConfiguredAirbyteStream?): AirbyteStateMessage {
62+
override fun generateStateMessageAtCheckpoint(
63+
stream: ConfiguredAirbyteStream?
64+
): AirbyteStateMessage {
6165
LOGGER.info("Sending CDC checkpoint state message.")
6266
val stateMessage = createStateMessage(checkpointOffsetToSend)
6367
previousCheckpointOffset.clear()
@@ -72,15 +76,20 @@ class DebeziumMessageProducer<T>(private val cdcStateHandler: CdcStateHandler,
7276
* @param message
7377
* @return
7478
*/
75-
override fun processRecordMessage(stream: ConfiguredAirbyteStream?, message: ChangeEventWithMetadata): AirbyteMessage {
79+
override fun processRecordMessage(
80+
stream: ConfiguredAirbyteStream?,
81+
message: ChangeEventWithMetadata
82+
): AirbyteMessage {
7683
if (checkpointOffsetToSend.isEmpty()) {
7784
try {
7885
val temporalOffset = offsetManager!!.read()
7986
if (!targetPosition.isSameOffset(previousCheckpointOffset, temporalOffset)) {
8087
checkpointOffsetToSend.putAll(temporalOffset)
8188
}
8289
} catch (e: ConnectException) {
83-
LOGGER.warn("Offset file is being written by Debezium. Skipping CDC checkpoint in this loop.")
90+
LOGGER.warn(
91+
"Offset file is being written by Debezium. Skipping CDC checkpoint in this loop."
92+
)
8493
}
8594
}
8695

@@ -122,7 +131,14 @@ class DebeziumMessageProducer<T>(private val cdcStateHandler: CdcStateHandler,
122131
*/
123132
private fun createStateMessage(offset: Map<String, String>): AirbyteStateMessage {
124133
val message =
125-
cdcStateHandler.saveState(offset, schemaHistoryManager.map { obj: AirbyteSchemaHistoryStorage -> obj.read() }.orElse(null))!!.state
134+
cdcStateHandler
135+
.saveState(
136+
offset,
137+
schemaHistoryManager
138+
.map { obj: AirbyteSchemaHistoryStorage -> obj.read() }
139+
.orElse(null)
140+
)!!
141+
.state
126142
return message
127143
}
128144

airbyte-cdk/java/airbyte-cdk/db-sources/src/main/kotlin/io/airbyte/cdk/integrations/debezium/internals/DebeziumStateDecoratingIterator.kt

-211
This file was deleted.

0 commit comments

Comments
 (0)