Skip to content

convert #36333 to kotlin #36520

New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
@@ -1 +0,0 @@

Original file line number Diff line number Diff line change
@@ -1 +0,0 @@

This file was deleted.

Original file line number Diff line number Diff line change
Expand Up @@ -6,6 +6,8 @@ package io.airbyte.cdk.integrations.debezium
import com.fasterxml.jackson.databind.JsonNode
import io.airbyte.cdk.db.jdbc.JdbcUtils
import io.airbyte.cdk.integrations.debezium.internals.*
import io.airbyte.cdk.integrations.source.relationaldb.state.SourceStateIterator
import io.airbyte.cdk.integrations.source.relationaldb.state.StateEmitFrequency
import io.airbyte.commons.util.AutoCloseableIterator
import io.airbyte.commons.util.AutoCloseableIterators
import io.airbyte.protocol.models.v0.AirbyteMessage
Expand All @@ -14,13 +16,14 @@ import io.airbyte.protocol.models.v0.ConfiguredAirbyteStream
import io.airbyte.protocol.models.v0.SyncMode
import io.debezium.engine.ChangeEvent
import io.debezium.engine.DebeziumEngine
import org.slf4j.Logger
import org.slf4j.LoggerFactory
import java.time.Duration
import java.time.Instant
import java.time.temporal.ChronoUnit
import java.util.*
import java.util.concurrent.LinkedBlockingQueue
import org.slf4j.Logger
import org.slf4j.LoggerFactory


/**
* This class acts as the bridge between Airbyte DB connectors and debezium. If a DB connector wants
Expand Down Expand Up @@ -118,19 +121,19 @@ class AirbyteDebeziumHandler<T>(
if (config.has(DebeziumIteratorConstants.SYNC_CHECKPOINT_RECORDS_PROPERTY))
config[DebeziumIteratorConstants.SYNC_CHECKPOINT_RECORDS_PROPERTY].asLong()
else DebeziumIteratorConstants.SYNC_CHECKPOINT_RECORDS.toLong()
return AutoCloseableIterators.fromIterator(
DebeziumStateDecoratingIterator(
eventIterator,
cdcStateHandler,

val messageProducer: DebeziumMessageProducer<T> = DebeziumMessageProducer<T>(cdcStateHandler,
targetPosition,
eventConverter,
offsetManager,
trackSchemaHistory,
schemaHistoryManager.orElse(null),
syncCheckpointDuration,
syncCheckpointRecords
)
)
schemaHistoryManager)


// Usually sourceStateIterator requires airbyteStream as input. For DBZ iterator, stream is not used
// at all thus we will pass in null.
val iterator: SourceStateIterator<ChangeEventWithMetadata> =
SourceStateIterator<ChangeEventWithMetadata>(eventIterator, null, messageProducer!!, StateEmitFrequency(syncCheckpointRecords, syncCheckpointDuration))
return AutoCloseableIterators.fromIterator<AirbyteMessage>(iterator)
}

companion object {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -12,7 +12,7 @@ import io.airbyte.protocol.models.v0.AirbyteMessage
*/
interface CdcStateHandler {
fun saveState(
offset: Map<String?, String?>?,
offset: Map<String, String>,
dbHistory: AirbyteSchemaHistoryStorage.SchemaHistory<String>?
): AirbyteMessage?

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -58,7 +58,7 @@ interface CdcTargetPosition<T> {
* @return Returns `true` when the event is ahead of the offset. Otherwise, it returns `false`
*/
fun isEventAheadOffset(
offset: Map<String?, String?>?,
offset: Map<String, String>?,
event: ChangeEventWithMetadata?
): Boolean {
return false
Expand All @@ -73,7 +73,7 @@ interface CdcTargetPosition<T> {
* @return Returns `true` if both offsets are at the same position. Otherwise, it returns
* `false`
*/
fun isSameOffset(offsetA: Map<String?, String?>?, offsetB: Map<String?, String?>?): Boolean {
fun isSameOffset(offsetA: Map<*, *>, offsetB: Map<String, String>): Boolean {
return false
}
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,132 @@
/*
* Copyright (c) 2024 Airbyte, Inc., all rights reserved.
*/
package io.airbyte.cdk.integrations.debezium.internals

import io.airbyte.cdk.integrations.debezium.CdcStateHandler
import io.airbyte.cdk.integrations.debezium.CdcTargetPosition
import io.airbyte.cdk.integrations.source.relationaldb.state.SourceStateMessageProducer
import io.airbyte.protocol.models.v0.AirbyteMessage
import io.airbyte.protocol.models.v0.AirbyteStateMessage
import io.airbyte.protocol.models.v0.ConfiguredAirbyteStream
import org.apache.kafka.connect.errors.ConnectException
import org.slf4j.Logger
import org.slf4j.LoggerFactory
import java.util.*

class DebeziumMessageProducer<T>(private val cdcStateHandler: CdcStateHandler,
targetPosition: CdcTargetPosition<T>,
eventConverter: DebeziumEventConverter,
offsetManager: AirbyteFileOffsetBackingStore?,
schemaHistoryManager: Optional<AirbyteSchemaHistoryStorage>) : SourceStateMessageProducer<ChangeEventWithMetadata> {
/**
* `checkpointOffsetToSend` is used as temporal storage for the offset that we want to send as
* message. As Debezium is reading records faster that we process them, if we try to send
* `offsetManger.read()` offset, it is possible that the state is behind the record we are currently
* propagating. To avoid that, we store the offset as soon as we reach the checkpoint threshold
* (time or records) and we wait to send it until we are sure that the record we are processing is
* behind the offset to be sent.
*/
private val checkpointOffsetToSend = HashMap<String, String>()

/**
* `previousCheckpointOffset` is used to make sure we don't send duplicated states with the same
* offset. Is it possible that the offset Debezium report doesn't move for a period of time, and if
* we just rely on the `offsetManger.read()`, there is a chance to sent duplicate states, generating
* an unneeded usage of networking and processing.
*/
private val initialOffset: HashMap<String, String>
private val previousCheckpointOffset: HashMap<String?, String?>
private val offsetManager: AirbyteFileOffsetBackingStore?
private val targetPosition: CdcTargetPosition<T>
private val schemaHistoryManager: Optional<AirbyteSchemaHistoryStorage>

private var shouldEmitStateMessage = false

private val eventConverter: DebeziumEventConverter

init {
this.targetPosition = targetPosition
this.eventConverter = eventConverter
this.offsetManager = offsetManager
if (offsetManager == null) {
throw RuntimeException("Offset manager cannot be null")
}
this.schemaHistoryManager = schemaHistoryManager
this.previousCheckpointOffset = offsetManager.read() as HashMap<String?, String?>
this.initialOffset = HashMap(this.previousCheckpointOffset)
}

override fun generateStateMessageAtCheckpoint(stream: ConfiguredAirbyteStream?): AirbyteStateMessage {
LOGGER.info("Sending CDC checkpoint state message.")
val stateMessage = createStateMessage(checkpointOffsetToSend)
previousCheckpointOffset.clear()
previousCheckpointOffset.putAll(checkpointOffsetToSend)
checkpointOffsetToSend.clear()
shouldEmitStateMessage = false
return stateMessage
}

/**
* @param stream
* @param message
* @return
*/
override fun processRecordMessage(stream: ConfiguredAirbyteStream?, message: ChangeEventWithMetadata): AirbyteMessage {
if (checkpointOffsetToSend.isEmpty()) {
try {
val temporalOffset = offsetManager!!.read()
if (!targetPosition.isSameOffset(previousCheckpointOffset, temporalOffset)) {
checkpointOffsetToSend.putAll(temporalOffset)
}
} catch (e: ConnectException) {
LOGGER.warn("Offset file is being written by Debezium. Skipping CDC checkpoint in this loop.")
}
}

if (checkpointOffsetToSend.size == 1 && !message!!.isSnapshotEvent) {
if (targetPosition.isEventAheadOffset(checkpointOffsetToSend, message)) {
shouldEmitStateMessage = true
} else {
LOGGER.info("Encountered records with the same event offset.")
}
}

return eventConverter.toAirbyteMessage(message!!)
}

override fun createFinalStateMessage(stream: ConfiguredAirbyteStream?): AirbyteStateMessage {
val syncFinishedOffset = offsetManager!!.read()
if (targetPosition.isSameOffset(initialOffset, syncFinishedOffset)) {
// Edge case where no progress has been made: wrap up the
// sync by returning the initial offset instead of the
// current offset. We do this because we found that
// for some databases, heartbeats will cause Debezium to
// overwrite the offset file with a state which doesn't
// include all necessary data such as snapshot completion.
// This is the case for MS SQL Server, at least.
return createStateMessage(initialOffset)
}
return createStateMessage(syncFinishedOffset)
}

override fun shouldEmitStateMessage(stream: ConfiguredAirbyteStream?): Boolean {
return shouldEmitStateMessage
}

/**
* Creates [AirbyteStateMessage] while updating CDC data, used to checkpoint the state of the
* process.
*
* @return [AirbyteStateMessage] which includes offset and schema history if used.
*/
private fun createStateMessage(offset: Map<String, String>): AirbyteStateMessage {
val message =
cdcStateHandler.saveState(offset, schemaHistoryManager.map { obj: AirbyteSchemaHistoryStorage -> obj.read() }.orElse(null))!!.state
return message
}

companion object {
private val LOGGER: Logger = LoggerFactory.getLogger(DebeziumMessageProducer::class.java)
}
}
Loading