Skip to content

Commit ec77cc0

Browse files
convert CDK db-sources submodules to kotlin
1 parent fd38ace commit ec77cc0

File tree

1 file changed

+197
-0
lines changed

1 file changed

+197
-0
lines changed
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,197 @@
1+
/*
2+
* Copyright (c) 2023 Airbyte, Inc., all rights reserved.
3+
*/
4+
package io.airbyte.cdk.integrations.debezium.internals
5+
6+
import com.google.common.collect.AbstractIterator
7+
import io.airbyte.cdk.integrations.debezium.CdcStateHandler
8+
import io.airbyte.cdk.integrations.debezium.CdcTargetPosition
9+
import io.airbyte.protocol.models.v0.AirbyteMessage
10+
import io.airbyte.protocol.models.v0.AirbyteStateStats
11+
import org.apache.kafka.connect.errors.ConnectException
12+
import org.slf4j.Logger
13+
import org.slf4j.LoggerFactory
14+
import java.time.Duration
15+
import java.time.OffsetDateTime
16+
17+
/**
18+
* This class encapsulates CDC change events and adds the required functionality to create
19+
* checkpoints for CDC replications. That way, if the process fails in the middle of a long sync, it
20+
* will be able to recover for any acknowledged checkpoint in the next syncs.
21+
*/
22+
class DebeziumStateDecoratingIterator<T>(private val changeEventIterator: Iterator<ChangeEventWithMetadata>,
23+
private val cdcStateHandler: CdcStateHandler,
24+
private val targetPosition: CdcTargetPosition<T>,
25+
private val eventConverter: DebeziumEventConverter,
26+
offsetManager: AirbyteFileOffsetBackingStore,
27+
private val trackSchemaHistory: Boolean,
28+
private val schemaHistoryManager: AirbyteSchemaHistoryStorage?,
29+
checkpointDuration: Duration,
30+
checkpointRecords: Long) : AbstractIterator<AirbyteMessage?>(), MutableIterator<AirbyteMessage?> {
31+
private val offsetManager: AirbyteFileOffsetBackingStore? = offsetManager
32+
private var isSyncFinished = false
33+
34+
/**
35+
* These parameters control when a checkpoint message has to be sent in a CDC integration. We can
36+
* emit a checkpoint when any of the following two conditions are met.
37+
*
38+
*
39+
* 1. The amount of records in the current loop (`SYNC_CHECKPOINT_RECORDS`) is higher than a
40+
* threshold defined by `SYNC_CHECKPOINT_RECORDS`.
41+
*
42+
*
43+
* 2. Time between checkpoints (`dateTimeLastSync`) is higher than a `Duration` defined
44+
* at `SYNC_CHECKPOINT_SECONDS`.
45+
*
46+
*
47+
*/
48+
private val syncCheckpointDuration = checkpointDuration
49+
private val syncCheckpointRecords = checkpointRecords
50+
private var dateTimeLastSync: OffsetDateTime? = null
51+
private var recordsLastSync: Long = 0
52+
private var recordsAllSyncs: Long = 0
53+
private var sendCheckpointMessage = false
54+
55+
/**
56+
* `checkpointOffsetToSend` is used as temporal storage for the offset that we want to send as
57+
* message. As Debezium is reading records faster that we process them, if we try to send
58+
* `offsetManger.read()` offset, it is possible that the state is behind the record we are currently
59+
* propagating. To avoid that, we store the offset as soon as we reach the checkpoint threshold
60+
* (time or records) and we wait to send it until we are sure that the record we are processing is
61+
* behind the offset to be sent.
62+
*/
63+
private val checkpointOffsetToSend = HashMap<String?, String?>()
64+
65+
/**
66+
* `previousCheckpointOffset` is used to make sure we don't send duplicated states with the same
67+
* offset. Is it possible that the offset Debezium report doesn't move for a period of time, and if
68+
* we just rely on the `offsetManger.read()`, there is a chance to sent duplicate states, generating
69+
* an unneeded usage of networking and processing.
70+
*/
71+
private val initialOffset: HashMap<String?, String?>
72+
private val previousCheckpointOffset: HashMap<String?, String?>? = offsetManager.read() as HashMap<String?, String?>
73+
74+
/**
75+
* @param changeEventIterator Base iterator that we want to enrich with checkpoint messages
76+
* @param cdcStateHandler Handler to save the offset and schema history
77+
* @param offsetManager Handler to read and write debezium offset file
78+
* @param eventConverter Handler to transform debezium events into Airbyte messages.
79+
* @param trackSchemaHistory Set true if the schema needs to be tracked
80+
* @param schemaHistoryManager Handler to write schema. Needs to be initialized if
81+
* trackSchemaHistory is set to true
82+
* @param checkpointDuration Duration object with time between syncs
83+
* @param checkpointRecords Number of records between syncs
84+
*/
85+
init {
86+
this.initialOffset = HashMap(this.previousCheckpointOffset)
87+
resetCheckpointValues()
88+
}
89+
90+
/**
91+
* Computes the next record retrieved from Source stream. Emits state messages as checkpoints based
92+
* on number of records or time lapsed.
93+
*
94+
*
95+
*
96+
* If this method throws an exception, it will propagate outward to the `hasNext` or
97+
* `next` invocation that invoked this method. Any further attempts to use the iterator will
98+
* result in an [IllegalStateException].
99+
*
100+
*
101+
* @return [AirbyteStateMessage] containing CDC data or state checkpoint message.
102+
*/
103+
override fun computeNext(): AirbyteMessage? {
104+
if (isSyncFinished) {
105+
return endOfData()
106+
}
107+
108+
if (cdcStateHandler.isCdcCheckpointEnabled && sendCheckpointMessage) {
109+
LOGGER.info("Sending CDC checkpoint state message.")
110+
val stateMessage = createStateMessage(checkpointOffsetToSend, recordsLastSync)
111+
previousCheckpointOffset!!.clear()
112+
previousCheckpointOffset.putAll(checkpointOffsetToSend)
113+
resetCheckpointValues()
114+
return stateMessage
115+
}
116+
117+
if (changeEventIterator.hasNext()) {
118+
val event = changeEventIterator.next()
119+
120+
if (cdcStateHandler.isCdcCheckpointEnabled) {
121+
if (checkpointOffsetToSend.isEmpty() &&
122+
(recordsLastSync >= syncCheckpointRecords ||
123+
Duration.between(dateTimeLastSync, OffsetDateTime.now()).compareTo(syncCheckpointDuration) > 0)) {
124+
// Using temporal variable to avoid reading teh offset twice, one in the condition and another in
125+
// the assignation
126+
try {
127+
val temporalOffset = offsetManager!!.read() as HashMap<String?, String?>
128+
if (!targetPosition.isSameOffset(previousCheckpointOffset, temporalOffset)) {
129+
checkpointOffsetToSend.putAll(temporalOffset)
130+
}
131+
} catch (e: ConnectException) {
132+
LOGGER.warn("Offset file is being written by Debezium. Skipping CDC checkpoint in this loop.")
133+
}
134+
}
135+
136+
if (checkpointOffsetToSend.size == 1 && changeEventIterator.hasNext()
137+
&& !event.isSnapshotEvent) {
138+
if (targetPosition.isEventAheadOffset(checkpointOffsetToSend, event)) {
139+
sendCheckpointMessage = true
140+
} else {
141+
LOGGER.info("Encountered {} records with the same event offset", recordsLastSync)
142+
}
143+
}
144+
}
145+
recordsLastSync++
146+
recordsAllSyncs++
147+
return eventConverter.toAirbyteMessage(event)
148+
}
149+
150+
isSyncFinished = true
151+
val syncFinishedOffset = offsetManager!!.read() as HashMap<String?, String?>
152+
if (recordsAllSyncs == 0L && targetPosition.isSameOffset(initialOffset, syncFinishedOffset)) {
153+
// Edge case where no progress has been made: wrap up the
154+
// sync by returning the initial offset instead of the
155+
// current offset. We do this because we found that
156+
// for some databases, heartbeats will cause Debezium to
157+
// overwrite the offset file with a state which doesn't
158+
// include all necessary data such as snapshot completion.
159+
// This is the case for MS SQL Server, at least.
160+
return createStateMessage(initialOffset, 0)
161+
}
162+
return createStateMessage(syncFinishedOffset, recordsLastSync)
163+
}
164+
165+
/**
166+
* Initialize or reset the checkpoint variables.
167+
*/
168+
private fun resetCheckpointValues() {
169+
sendCheckpointMessage = false
170+
checkpointOffsetToSend.clear()
171+
recordsLastSync = 0L
172+
dateTimeLastSync = OffsetDateTime.now()
173+
}
174+
175+
/**
176+
* Creates [AirbyteStateMessage] while updating CDC data, used to checkpoint the state of the
177+
* process.
178+
*
179+
* @return [AirbyteStateMessage] which includes offset and schema history if used.
180+
*/
181+
private fun createStateMessage(offset: Map<String?, String?>?, recordCount: Long): AirbyteMessage? {
182+
if (trackSchemaHistory && schemaHistoryManager == null) {
183+
throw RuntimeException("Schema History Tracking is true but manager is not initialised")
184+
}
185+
if (offsetManager == null) {
186+
throw RuntimeException("Offset can not be null")
187+
}
188+
189+
val message = cdcStateHandler.saveState(offset, schemaHistoryManager?.read())
190+
message!!.state.withSourceStats(AirbyteStateStats().withRecordCount(recordCount.toDouble()))
191+
return message
192+
}
193+
194+
companion object {
195+
private val LOGGER: Logger = LoggerFactory.getLogger(DebeziumStateDecoratingIterator::class.java)
196+
}
197+
}

0 commit comments

Comments
 (0)