Skip to content

Commit 236960f

Browse files
convert #36333 to kotlin
1 parent 0972a11 commit 236960f

File tree

13 files changed

+257
-487
lines changed

13 files changed

+257
-487
lines changed
Original file line numberDiff line numberDiff line change
@@ -1 +0,0 @@
1-
Original file line numberDiff line numberDiff line change
@@ -1 +0,0 @@
1-

airbyte-cdk/java/airbyte-cdk/db-sources/src/main/java/io/airbyte/cdk/integrations/debezium/internals/DebeziumMessageProducer.java

-144
This file was deleted.

airbyte-cdk/java/airbyte-cdk/db-sources/src/main/kotlin/io/airbyte/cdk/integrations/debezium/AirbyteDebeziumHandler.kt

+15-12
Original file line numberDiff line numberDiff line change
@@ -6,6 +6,8 @@ package io.airbyte.cdk.integrations.debezium
66
import com.fasterxml.jackson.databind.JsonNode
77
import io.airbyte.cdk.db.jdbc.JdbcUtils
88
import io.airbyte.cdk.integrations.debezium.internals.*
9+
import io.airbyte.cdk.integrations.source.relationaldb.state.SourceStateIterator
10+
import io.airbyte.cdk.integrations.source.relationaldb.state.StateEmitFrequency
911
import io.airbyte.commons.util.AutoCloseableIterator
1012
import io.airbyte.commons.util.AutoCloseableIterators
1113
import io.airbyte.protocol.models.v0.AirbyteMessage
@@ -14,13 +16,14 @@ import io.airbyte.protocol.models.v0.ConfiguredAirbyteStream
1416
import io.airbyte.protocol.models.v0.SyncMode
1517
import io.debezium.engine.ChangeEvent
1618
import io.debezium.engine.DebeziumEngine
19+
import org.slf4j.Logger
20+
import org.slf4j.LoggerFactory
1721
import java.time.Duration
1822
import java.time.Instant
1923
import java.time.temporal.ChronoUnit
2024
import java.util.*
2125
import java.util.concurrent.LinkedBlockingQueue
22-
import org.slf4j.Logger
23-
import org.slf4j.LoggerFactory
26+
2427

2528
/**
2629
* This class acts as the bridge between Airbyte DB connectors and debezium. If a DB connector wants
@@ -118,19 +121,19 @@ class AirbyteDebeziumHandler<T>(
118121
if (config.has(DebeziumIteratorConstants.SYNC_CHECKPOINT_RECORDS_PROPERTY))
119122
config[DebeziumIteratorConstants.SYNC_CHECKPOINT_RECORDS_PROPERTY].asLong()
120123
else DebeziumIteratorConstants.SYNC_CHECKPOINT_RECORDS.toLong()
121-
return AutoCloseableIterators.fromIterator(
122-
DebeziumStateDecoratingIterator(
123-
eventIterator,
124-
cdcStateHandler,
124+
125+
val messageProducer: DebeziumMessageProducer<T> = DebeziumMessageProducer<T>(cdcStateHandler,
125126
targetPosition,
126127
eventConverter,
127128
offsetManager,
128-
trackSchemaHistory,
129-
schemaHistoryManager.orElse(null),
130-
syncCheckpointDuration,
131-
syncCheckpointRecords
132-
)
133-
)
129+
schemaHistoryManager)
130+
131+
132+
// Usually sourceStateIterator requires airbyteStream as input. For DBZ iterator, stream is not used
133+
// at all thus we will pass in null.
134+
val iterator: SourceStateIterator<ChangeEventWithMetadata> =
135+
SourceStateIterator<ChangeEventWithMetadata>(eventIterator, null, messageProducer!!, StateEmitFrequency(syncCheckpointRecords, syncCheckpointDuration))
136+
return AutoCloseableIterators.fromIterator<AirbyteMessage>(iterator)
134137
}
135138

136139
companion object {

airbyte-cdk/java/airbyte-cdk/db-sources/src/main/kotlin/io/airbyte/cdk/integrations/debezium/CdcStateHandler.kt

+1-1
Original file line numberDiff line numberDiff line change
@@ -12,7 +12,7 @@ import io.airbyte.protocol.models.v0.AirbyteMessage
1212
*/
1313
interface CdcStateHandler {
1414
fun saveState(
15-
offset: Map<String?, String?>?,
15+
offset: Map<String, String>,
1616
dbHistory: AirbyteSchemaHistoryStorage.SchemaHistory<String>?
1717
): AirbyteMessage?
1818

airbyte-cdk/java/airbyte-cdk/db-sources/src/main/kotlin/io/airbyte/cdk/integrations/debezium/CdcTargetPosition.kt

+2-2
Original file line numberDiff line numberDiff line change
@@ -58,7 +58,7 @@ interface CdcTargetPosition<T> {
5858
* @return Returns `true` when the event is ahead of the offset. Otherwise, it returns `false`
5959
*/
6060
fun isEventAheadOffset(
61-
offset: Map<String?, String?>?,
61+
offset: Map<String, String>?,
6262
event: ChangeEventWithMetadata?
6363
): Boolean {
6464
return false
@@ -73,7 +73,7 @@ interface CdcTargetPosition<T> {
7373
* @return Returns `true` if both offsets are at the same position. Otherwise, it returns
7474
* `false`
7575
*/
76-
fun isSameOffset(offsetA: Map<String?, String?>?, offsetB: Map<String?, String?>?): Boolean {
76+
fun isSameOffset(offsetA: Map<*, *>, offsetB: Map<String, String>): Boolean {
7777
return false
7878
}
7979
}
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,132 @@
1+
/*
2+
* Copyright (c) 2024 Airbyte, Inc., all rights reserved.
3+
*/
4+
package io.airbyte.cdk.integrations.debezium.internals
5+
6+
import io.airbyte.cdk.integrations.debezium.CdcStateHandler
7+
import io.airbyte.cdk.integrations.debezium.CdcTargetPosition
8+
import io.airbyte.cdk.integrations.source.relationaldb.state.SourceStateMessageProducer
9+
import io.airbyte.protocol.models.v0.AirbyteMessage
10+
import io.airbyte.protocol.models.v0.AirbyteStateMessage
11+
import io.airbyte.protocol.models.v0.ConfiguredAirbyteStream
12+
import org.apache.kafka.connect.errors.ConnectException
13+
import org.slf4j.Logger
14+
import org.slf4j.LoggerFactory
15+
import java.util.*
16+
17+
class DebeziumMessageProducer<T>(private val cdcStateHandler: CdcStateHandler,
18+
targetPosition: CdcTargetPosition<T>,
19+
eventConverter: DebeziumEventConverter,
20+
offsetManager: AirbyteFileOffsetBackingStore?,
21+
schemaHistoryManager: Optional<AirbyteSchemaHistoryStorage>) : SourceStateMessageProducer<ChangeEventWithMetadata> {
22+
/**
23+
* `checkpointOffsetToSend` is used as temporal storage for the offset that we want to send as
24+
* message. As Debezium is reading records faster that we process them, if we try to send
25+
* `offsetManger.read()` offset, it is possible that the state is behind the record we are currently
26+
* propagating. To avoid that, we store the offset as soon as we reach the checkpoint threshold
27+
* (time or records) and we wait to send it until we are sure that the record we are processing is
28+
* behind the offset to be sent.
29+
*/
30+
private val checkpointOffsetToSend = HashMap<String, String>()
31+
32+
/**
33+
* `previousCheckpointOffset` is used to make sure we don't send duplicated states with the same
34+
* offset. Is it possible that the offset Debezium report doesn't move for a period of time, and if
35+
* we just rely on the `offsetManger.read()`, there is a chance to sent duplicate states, generating
36+
* an unneeded usage of networking and processing.
37+
*/
38+
private val initialOffset: HashMap<String, String>
39+
private val previousCheckpointOffset: HashMap<String?, String?>
40+
private val offsetManager: AirbyteFileOffsetBackingStore?
41+
private val targetPosition: CdcTargetPosition<T>
42+
private val schemaHistoryManager: Optional<AirbyteSchemaHistoryStorage>
43+
44+
private var shouldEmitStateMessage = false
45+
46+
private val eventConverter: DebeziumEventConverter
47+
48+
init {
49+
this.targetPosition = targetPosition
50+
this.eventConverter = eventConverter
51+
this.offsetManager = offsetManager
52+
if (offsetManager == null) {
53+
throw RuntimeException("Offset manager cannot be null")
54+
}
55+
this.schemaHistoryManager = schemaHistoryManager
56+
this.previousCheckpointOffset = offsetManager.read() as HashMap<String?, String?>
57+
this.initialOffset = HashMap(this.previousCheckpointOffset)
58+
}
59+
60+
override fun generateStateMessageAtCheckpoint(stream: ConfiguredAirbyteStream?): AirbyteStateMessage {
61+
LOGGER.info("Sending CDC checkpoint state message.")
62+
val stateMessage = createStateMessage(checkpointOffsetToSend)
63+
previousCheckpointOffset.clear()
64+
previousCheckpointOffset.putAll(checkpointOffsetToSend)
65+
checkpointOffsetToSend.clear()
66+
shouldEmitStateMessage = false
67+
return stateMessage
68+
}
69+
70+
/**
71+
* @param stream
72+
* @param message
73+
* @return
74+
*/
75+
override fun processRecordMessage(stream: ConfiguredAirbyteStream?, message: ChangeEventWithMetadata): AirbyteMessage {
76+
if (checkpointOffsetToSend.isEmpty()) {
77+
try {
78+
val temporalOffset = offsetManager!!.read()
79+
if (!targetPosition.isSameOffset(previousCheckpointOffset, temporalOffset)) {
80+
checkpointOffsetToSend.putAll(temporalOffset)
81+
}
82+
} catch (e: ConnectException) {
83+
LOGGER.warn("Offset file is being written by Debezium. Skipping CDC checkpoint in this loop.")
84+
}
85+
}
86+
87+
if (checkpointOffsetToSend.size == 1 && !message!!.isSnapshotEvent) {
88+
if (targetPosition.isEventAheadOffset(checkpointOffsetToSend, message)) {
89+
shouldEmitStateMessage = true
90+
} else {
91+
LOGGER.info("Encountered records with the same event offset.")
92+
}
93+
}
94+
95+
return eventConverter.toAirbyteMessage(message!!)
96+
}
97+
98+
override fun createFinalStateMessage(stream: ConfiguredAirbyteStream?): AirbyteStateMessage {
99+
val syncFinishedOffset = offsetManager!!.read()
100+
if (targetPosition.isSameOffset(initialOffset, syncFinishedOffset)) {
101+
// Edge case where no progress has been made: wrap up the
102+
// sync by returning the initial offset instead of the
103+
// current offset. We do this because we found that
104+
// for some databases, heartbeats will cause Debezium to
105+
// overwrite the offset file with a state which doesn't
106+
// include all necessary data such as snapshot completion.
107+
// This is the case for MS SQL Server, at least.
108+
return createStateMessage(initialOffset)
109+
}
110+
return createStateMessage(syncFinishedOffset)
111+
}
112+
113+
override fun shouldEmitStateMessage(stream: ConfiguredAirbyteStream?): Boolean {
114+
return shouldEmitStateMessage
115+
}
116+
117+
/**
118+
* Creates [AirbyteStateMessage] while updating CDC data, used to checkpoint the state of the
119+
* process.
120+
*
121+
* @return [AirbyteStateMessage] which includes offset and schema history if used.
122+
*/
123+
private fun createStateMessage(offset: Map<String, String>): AirbyteStateMessage {
124+
val message =
125+
cdcStateHandler.saveState(offset, schemaHistoryManager.map { obj: AirbyteSchemaHistoryStorage -> obj.read() }.orElse(null))!!.state
126+
return message
127+
}
128+
129+
companion object {
130+
private val LOGGER: Logger = LoggerFactory.getLogger(DebeziumMessageProducer::class.java)
131+
}
132+
}

0 commit comments

Comments
 (0)