diff --git a/airbyte-cdk/java/airbyte-cdk/db-sources/src/main/java/io/airbyte/cdk/integrations/debezium/AirbyteDebeziumHandler.java b/airbyte-cdk/java/airbyte-cdk/db-sources/src/main/java/io/airbyte/cdk/integrations/debezium/AirbyteDebeziumHandler.java index 8b137891791fe..e69de29bb2d1d 100644 --- a/airbyte-cdk/java/airbyte-cdk/db-sources/src/main/java/io/airbyte/cdk/integrations/debezium/AirbyteDebeziumHandler.java +++ b/airbyte-cdk/java/airbyte-cdk/db-sources/src/main/java/io/airbyte/cdk/integrations/debezium/AirbyteDebeziumHandler.java @@ -1 +0,0 @@ - diff --git a/airbyte-cdk/java/airbyte-cdk/db-sources/src/main/java/io/airbyte/cdk/integrations/debezium/CdcMetadataInjector.java b/airbyte-cdk/java/airbyte-cdk/db-sources/src/main/java/io/airbyte/cdk/integrations/debezium/CdcMetadataInjector.java index 8b137891791fe..e69de29bb2d1d 100644 --- a/airbyte-cdk/java/airbyte-cdk/db-sources/src/main/java/io/airbyte/cdk/integrations/debezium/CdcMetadataInjector.java +++ b/airbyte-cdk/java/airbyte-cdk/db-sources/src/main/java/io/airbyte/cdk/integrations/debezium/CdcMetadataInjector.java @@ -1 +0,0 @@ - diff --git a/airbyte-cdk/java/airbyte-cdk/db-sources/src/main/java/io/airbyte/cdk/integrations/debezium/internals/DebeziumMessageProducer.java b/airbyte-cdk/java/airbyte-cdk/db-sources/src/main/java/io/airbyte/cdk/integrations/debezium/internals/DebeziumMessageProducer.java deleted file mode 100644 index 4b8c7c65d1765..0000000000000 --- a/airbyte-cdk/java/airbyte-cdk/db-sources/src/main/java/io/airbyte/cdk/integrations/debezium/internals/DebeziumMessageProducer.java +++ /dev/null @@ -1,144 +0,0 @@ -/* - * Copyright (c) 2024 Airbyte, Inc., all rights reserved. - */ - -package io.airbyte.cdk.integrations.debezium.internals; - -import io.airbyte.cdk.integrations.debezium.CdcStateHandler; -import io.airbyte.cdk.integrations.debezium.CdcTargetPosition; -import io.airbyte.cdk.integrations.source.relationaldb.state.SourceStateMessageProducer; -import io.airbyte.protocol.models.v0.AirbyteMessage; -import io.airbyte.protocol.models.v0.AirbyteStateMessage; -import io.airbyte.protocol.models.v0.ConfiguredAirbyteStream; -import java.util.HashMap; -import java.util.Map; -import java.util.Optional; -import org.apache.kafka.connect.errors.ConnectException; -import org.slf4j.Logger; -import org.slf4j.LoggerFactory; - -public class DebeziumMessageProducer implements SourceStateMessageProducer { - - private static final Logger LOGGER = LoggerFactory.getLogger(DebeziumMessageProducer.class); - - private final CdcStateHandler cdcStateHandler; - - /** - * `checkpointOffsetToSend` is used as temporal storage for the offset that we want to send as - * message. As Debezium is reading records faster that we process them, if we try to send - * `offsetManger.read()` offset, it is possible that the state is behind the record we are currently - * propagating. To avoid that, we store the offset as soon as we reach the checkpoint threshold - * (time or records) and we wait to send it until we are sure that the record we are processing is - * behind the offset to be sent. - */ - private final HashMap checkpointOffsetToSend = new HashMap<>(); - - /** - * `previousCheckpointOffset` is used to make sure we don't send duplicated states with the same - * offset. Is it possible that the offset Debezium report doesn't move for a period of time, and if - * we just rely on the `offsetManger.read()`, there is a chance to sent duplicate states, generating - * an unneeded usage of networking and processing. - */ - private final HashMap initialOffset, previousCheckpointOffset; - private final AirbyteFileOffsetBackingStore offsetManager; - private final CdcTargetPosition targetPosition; - private final Optional schemaHistoryManager; - - private boolean shouldEmitStateMessage = false; - - private final DebeziumEventConverter eventConverter; - - public DebeziumMessageProducer( - final CdcStateHandler cdcStateHandler, - final CdcTargetPosition targetPosition, - final DebeziumEventConverter eventConverter, - final AirbyteFileOffsetBackingStore offsetManager, - final Optional schemaHistoryManager) { - this.cdcStateHandler = cdcStateHandler; - this.targetPosition = targetPosition; - this.eventConverter = eventConverter; - this.offsetManager = offsetManager; - if (offsetManager == null) { - throw new RuntimeException("Offset manager cannot be null"); - } - this.schemaHistoryManager = schemaHistoryManager; - this.previousCheckpointOffset = (HashMap) offsetManager.read(); - this.initialOffset = new HashMap<>(this.previousCheckpointOffset); - } - - @Override - public AirbyteStateMessage generateStateMessageAtCheckpoint(ConfiguredAirbyteStream stream) { - LOGGER.info("Sending CDC checkpoint state message."); - final AirbyteStateMessage stateMessage = createStateMessage(checkpointOffsetToSend); - previousCheckpointOffset.clear(); - previousCheckpointOffset.putAll(checkpointOffsetToSend); - checkpointOffsetToSend.clear(); - shouldEmitStateMessage = false; - return stateMessage; - } - - /** - * @param stream - * @param message - * @return - */ - @Override - public AirbyteMessage processRecordMessage(ConfiguredAirbyteStream stream, ChangeEventWithMetadata message) { - - if (checkpointOffsetToSend.isEmpty()) { - try { - final HashMap temporalOffset = (HashMap) offsetManager.read(); - if (!targetPosition.isSameOffset(previousCheckpointOffset, temporalOffset)) { - checkpointOffsetToSend.putAll(temporalOffset); - } - } catch (final ConnectException e) { - LOGGER.warn("Offset file is being written by Debezium. Skipping CDC checkpoint in this loop."); - } - } - - if (checkpointOffsetToSend.size() == 1 && !message.isSnapshotEvent()) { - if (targetPosition.isEventAheadOffset(checkpointOffsetToSend, message)) { - shouldEmitStateMessage = true; - } else { - LOGGER.info("Encountered records with the same event offset."); - } - } - - return eventConverter.toAirbyteMessage(message); - } - - @Override - public AirbyteStateMessage createFinalStateMessage(ConfiguredAirbyteStream stream) { - - final var syncFinishedOffset = (HashMap) offsetManager.read(); - if (targetPosition.isSameOffset(initialOffset, syncFinishedOffset)) { - // Edge case where no progress has been made: wrap up the - // sync by returning the initial offset instead of the - // current offset. We do this because we found that - // for some databases, heartbeats will cause Debezium to - // overwrite the offset file with a state which doesn't - // include all necessary data such as snapshot completion. - // This is the case for MS SQL Server, at least. - return createStateMessage(initialOffset); - } - return createStateMessage(syncFinishedOffset); - } - - @Override - public boolean shouldEmitStateMessage(ConfiguredAirbyteStream stream) { - return shouldEmitStateMessage; - } - - /** - * Creates {@link AirbyteStateMessage} while updating CDC data, used to checkpoint the state of the - * process. - * - * @return {@link AirbyteStateMessage} which includes offset and schema history if used. - */ - private AirbyteStateMessage createStateMessage(final Map offset) { - final AirbyteStateMessage message = - cdcStateHandler.saveState(offset, schemaHistoryManager.map(AirbyteSchemaHistoryStorage::read).orElse(null)).getState(); - return message; - } - -} diff --git a/airbyte-cdk/java/airbyte-cdk/db-sources/src/main/kotlin/io/airbyte/cdk/integrations/debezium/AirbyteDebeziumHandler.kt b/airbyte-cdk/java/airbyte-cdk/db-sources/src/main/kotlin/io/airbyte/cdk/integrations/debezium/AirbyteDebeziumHandler.kt index 2d619a02831d4..eb3224adb8cb9 100644 --- a/airbyte-cdk/java/airbyte-cdk/db-sources/src/main/kotlin/io/airbyte/cdk/integrations/debezium/AirbyteDebeziumHandler.kt +++ b/airbyte-cdk/java/airbyte-cdk/db-sources/src/main/kotlin/io/airbyte/cdk/integrations/debezium/AirbyteDebeziumHandler.kt @@ -6,6 +6,8 @@ package io.airbyte.cdk.integrations.debezium import com.fasterxml.jackson.databind.JsonNode import io.airbyte.cdk.db.jdbc.JdbcUtils import io.airbyte.cdk.integrations.debezium.internals.* +import io.airbyte.cdk.integrations.source.relationaldb.state.SourceStateIterator +import io.airbyte.cdk.integrations.source.relationaldb.state.StateEmitFrequency import io.airbyte.commons.util.AutoCloseableIterator import io.airbyte.commons.util.AutoCloseableIterators import io.airbyte.protocol.models.v0.AirbyteMessage @@ -14,13 +16,14 @@ import io.airbyte.protocol.models.v0.ConfiguredAirbyteStream import io.airbyte.protocol.models.v0.SyncMode import io.debezium.engine.ChangeEvent import io.debezium.engine.DebeziumEngine +import org.slf4j.Logger +import org.slf4j.LoggerFactory import java.time.Duration import java.time.Instant import java.time.temporal.ChronoUnit import java.util.* import java.util.concurrent.LinkedBlockingQueue -import org.slf4j.Logger -import org.slf4j.LoggerFactory + /** * This class acts as the bridge between Airbyte DB connectors and debezium. If a DB connector wants @@ -118,19 +121,19 @@ class AirbyteDebeziumHandler( if (config.has(DebeziumIteratorConstants.SYNC_CHECKPOINT_RECORDS_PROPERTY)) config[DebeziumIteratorConstants.SYNC_CHECKPOINT_RECORDS_PROPERTY].asLong() else DebeziumIteratorConstants.SYNC_CHECKPOINT_RECORDS.toLong() - return AutoCloseableIterators.fromIterator( - DebeziumStateDecoratingIterator( - eventIterator, - cdcStateHandler, + + val messageProducer: DebeziumMessageProducer = DebeziumMessageProducer(cdcStateHandler, targetPosition, eventConverter, offsetManager, - trackSchemaHistory, - schemaHistoryManager.orElse(null), - syncCheckpointDuration, - syncCheckpointRecords - ) - ) + schemaHistoryManager) + + + // Usually sourceStateIterator requires airbyteStream as input. For DBZ iterator, stream is not used + // at all thus we will pass in null. + val iterator: SourceStateIterator = + SourceStateIterator(eventIterator, null, messageProducer!!, StateEmitFrequency(syncCheckpointRecords, syncCheckpointDuration)) + return AutoCloseableIterators.fromIterator(iterator) } companion object { diff --git a/airbyte-cdk/java/airbyte-cdk/db-sources/src/main/kotlin/io/airbyte/cdk/integrations/debezium/CdcStateHandler.kt b/airbyte-cdk/java/airbyte-cdk/db-sources/src/main/kotlin/io/airbyte/cdk/integrations/debezium/CdcStateHandler.kt index 317c87e1cfbcc..976c97952a1da 100644 --- a/airbyte-cdk/java/airbyte-cdk/db-sources/src/main/kotlin/io/airbyte/cdk/integrations/debezium/CdcStateHandler.kt +++ b/airbyte-cdk/java/airbyte-cdk/db-sources/src/main/kotlin/io/airbyte/cdk/integrations/debezium/CdcStateHandler.kt @@ -12,7 +12,7 @@ import io.airbyte.protocol.models.v0.AirbyteMessage */ interface CdcStateHandler { fun saveState( - offset: Map?, + offset: Map, dbHistory: AirbyteSchemaHistoryStorage.SchemaHistory? ): AirbyteMessage? diff --git a/airbyte-cdk/java/airbyte-cdk/db-sources/src/main/kotlin/io/airbyte/cdk/integrations/debezium/CdcTargetPosition.kt b/airbyte-cdk/java/airbyte-cdk/db-sources/src/main/kotlin/io/airbyte/cdk/integrations/debezium/CdcTargetPosition.kt index f0a8e12dad5a5..ccff3d3b4d1fc 100644 --- a/airbyte-cdk/java/airbyte-cdk/db-sources/src/main/kotlin/io/airbyte/cdk/integrations/debezium/CdcTargetPosition.kt +++ b/airbyte-cdk/java/airbyte-cdk/db-sources/src/main/kotlin/io/airbyte/cdk/integrations/debezium/CdcTargetPosition.kt @@ -58,7 +58,7 @@ interface CdcTargetPosition { * @return Returns `true` when the event is ahead of the offset. Otherwise, it returns `false` */ fun isEventAheadOffset( - offset: Map?, + offset: Map?, event: ChangeEventWithMetadata? ): Boolean { return false @@ -73,7 +73,7 @@ interface CdcTargetPosition { * @return Returns `true` if both offsets are at the same position. Otherwise, it returns * `false` */ - fun isSameOffset(offsetA: Map?, offsetB: Map?): Boolean { + fun isSameOffset(offsetA: Map<*, *>, offsetB: Map): Boolean { return false } } diff --git a/airbyte-cdk/java/airbyte-cdk/db-sources/src/main/kotlin/io/airbyte/cdk/integrations/debezium/internals/DebeziumMessageProducer.kt b/airbyte-cdk/java/airbyte-cdk/db-sources/src/main/kotlin/io/airbyte/cdk/integrations/debezium/internals/DebeziumMessageProducer.kt new file mode 100644 index 0000000000000..0b5b606aff3bd --- /dev/null +++ b/airbyte-cdk/java/airbyte-cdk/db-sources/src/main/kotlin/io/airbyte/cdk/integrations/debezium/internals/DebeziumMessageProducer.kt @@ -0,0 +1,132 @@ +/* + * Copyright (c) 2024 Airbyte, Inc., all rights reserved. + */ +package io.airbyte.cdk.integrations.debezium.internals + +import io.airbyte.cdk.integrations.debezium.CdcStateHandler +import io.airbyte.cdk.integrations.debezium.CdcTargetPosition +import io.airbyte.cdk.integrations.source.relationaldb.state.SourceStateMessageProducer +import io.airbyte.protocol.models.v0.AirbyteMessage +import io.airbyte.protocol.models.v0.AirbyteStateMessage +import io.airbyte.protocol.models.v0.ConfiguredAirbyteStream +import org.apache.kafka.connect.errors.ConnectException +import org.slf4j.Logger +import org.slf4j.LoggerFactory +import java.util.* + +class DebeziumMessageProducer(private val cdcStateHandler: CdcStateHandler, + targetPosition: CdcTargetPosition, + eventConverter: DebeziumEventConverter, + offsetManager: AirbyteFileOffsetBackingStore?, + schemaHistoryManager: Optional) : SourceStateMessageProducer { + /** + * `checkpointOffsetToSend` is used as temporal storage for the offset that we want to send as + * message. As Debezium is reading records faster that we process them, if we try to send + * `offsetManger.read()` offset, it is possible that the state is behind the record we are currently + * propagating. To avoid that, we store the offset as soon as we reach the checkpoint threshold + * (time or records) and we wait to send it until we are sure that the record we are processing is + * behind the offset to be sent. + */ + private val checkpointOffsetToSend = HashMap() + + /** + * `previousCheckpointOffset` is used to make sure we don't send duplicated states with the same + * offset. Is it possible that the offset Debezium report doesn't move for a period of time, and if + * we just rely on the `offsetManger.read()`, there is a chance to sent duplicate states, generating + * an unneeded usage of networking and processing. + */ + private val initialOffset: HashMap + private val previousCheckpointOffset: HashMap + private val offsetManager: AirbyteFileOffsetBackingStore? + private val targetPosition: CdcTargetPosition + private val schemaHistoryManager: Optional + + private var shouldEmitStateMessage = false + + private val eventConverter: DebeziumEventConverter + + init { + this.targetPosition = targetPosition + this.eventConverter = eventConverter + this.offsetManager = offsetManager + if (offsetManager == null) { + throw RuntimeException("Offset manager cannot be null") + } + this.schemaHistoryManager = schemaHistoryManager + this.previousCheckpointOffset = offsetManager.read() as HashMap + this.initialOffset = HashMap(this.previousCheckpointOffset) + } + + override fun generateStateMessageAtCheckpoint(stream: ConfiguredAirbyteStream?): AirbyteStateMessage { + LOGGER.info("Sending CDC checkpoint state message.") + val stateMessage = createStateMessage(checkpointOffsetToSend) + previousCheckpointOffset.clear() + previousCheckpointOffset.putAll(checkpointOffsetToSend) + checkpointOffsetToSend.clear() + shouldEmitStateMessage = false + return stateMessage + } + + /** + * @param stream + * @param message + * @return + */ + override fun processRecordMessage(stream: ConfiguredAirbyteStream?, message: ChangeEventWithMetadata): AirbyteMessage { + if (checkpointOffsetToSend.isEmpty()) { + try { + val temporalOffset = offsetManager!!.read() + if (!targetPosition.isSameOffset(previousCheckpointOffset, temporalOffset)) { + checkpointOffsetToSend.putAll(temporalOffset) + } + } catch (e: ConnectException) { + LOGGER.warn("Offset file is being written by Debezium. Skipping CDC checkpoint in this loop.") + } + } + + if (checkpointOffsetToSend.size == 1 && !message!!.isSnapshotEvent) { + if (targetPosition.isEventAheadOffset(checkpointOffsetToSend, message)) { + shouldEmitStateMessage = true + } else { + LOGGER.info("Encountered records with the same event offset.") + } + } + + return eventConverter.toAirbyteMessage(message!!) + } + + override fun createFinalStateMessage(stream: ConfiguredAirbyteStream?): AirbyteStateMessage { + val syncFinishedOffset = offsetManager!!.read() + if (targetPosition.isSameOffset(initialOffset, syncFinishedOffset)) { + // Edge case where no progress has been made: wrap up the + // sync by returning the initial offset instead of the + // current offset. We do this because we found that + // for some databases, heartbeats will cause Debezium to + // overwrite the offset file with a state which doesn't + // include all necessary data such as snapshot completion. + // This is the case for MS SQL Server, at least. + return createStateMessage(initialOffset) + } + return createStateMessage(syncFinishedOffset) + } + + override fun shouldEmitStateMessage(stream: ConfiguredAirbyteStream?): Boolean { + return shouldEmitStateMessage + } + + /** + * Creates [AirbyteStateMessage] while updating CDC data, used to checkpoint the state of the + * process. + * + * @return [AirbyteStateMessage] which includes offset and schema history if used. + */ + private fun createStateMessage(offset: Map): AirbyteStateMessage { + val message = + cdcStateHandler.saveState(offset, schemaHistoryManager.map { obj: AirbyteSchemaHistoryStorage -> obj.read() }.orElse(null))!!.state + return message + } + + companion object { + private val LOGGER: Logger = LoggerFactory.getLogger(DebeziumMessageProducer::class.java) + } +} diff --git a/airbyte-cdk/java/airbyte-cdk/db-sources/src/main/kotlin/io/airbyte/cdk/integrations/debezium/internals/DebeziumStateDecoratingIterator.kt b/airbyte-cdk/java/airbyte-cdk/db-sources/src/main/kotlin/io/airbyte/cdk/integrations/debezium/internals/DebeziumStateDecoratingIterator.kt deleted file mode 100644 index 2c1313f99f326..0000000000000 --- a/airbyte-cdk/java/airbyte-cdk/db-sources/src/main/kotlin/io/airbyte/cdk/integrations/debezium/internals/DebeziumStateDecoratingIterator.kt +++ /dev/null @@ -1,211 +0,0 @@ -/* - * Copyright (c) 2023 Airbyte, Inc., all rights reserved. - */ -package io.airbyte.cdk.integrations.debezium.internals - -import com.google.common.collect.AbstractIterator -import io.airbyte.cdk.integrations.debezium.CdcStateHandler -import io.airbyte.cdk.integrations.debezium.CdcTargetPosition -import io.airbyte.protocol.models.v0.AirbyteMessage -import io.airbyte.protocol.models.v0.AirbyteStateStats -import java.time.Duration -import java.time.OffsetDateTime -import org.apache.kafka.connect.errors.ConnectException -import org.slf4j.Logger -import org.slf4j.LoggerFactory - -/** - * This class encapsulates CDC change events and adds the required functionality to create - * checkpoints for CDC replications. That way, if the process fails in the middle of a long sync, it - * will be able to recover for any acknowledged checkpoint in the next syncs. - */ -class DebeziumStateDecoratingIterator( - private val changeEventIterator: Iterator, - private val cdcStateHandler: CdcStateHandler, - private val targetPosition: CdcTargetPosition, - private val eventConverter: DebeziumEventConverter, - offsetManager: AirbyteFileOffsetBackingStore, - private val trackSchemaHistory: Boolean, - private val schemaHistoryManager: AirbyteSchemaHistoryStorage?, - checkpointDuration: Duration, - checkpointRecords: Long -) : AbstractIterator(), MutableIterator { - private val offsetManager: AirbyteFileOffsetBackingStore? = offsetManager - private var isSyncFinished = false - - /** - * These parameters control when a checkpoint message has to be sent in a CDC integration. We - * can emit a checkpoint when any of the following two conditions are met. - * - * 1. The amount of records in the current loop (`SYNC_CHECKPOINT_RECORDS`) is higher than a - * threshold defined by `SYNC_CHECKPOINT_RECORDS`. - * - * 2. Time between checkpoints (`dateTimeLastSync`) is higher than a `Duration` defined at - * `SYNC_CHECKPOINT_SECONDS`. - */ - private val syncCheckpointDuration = checkpointDuration - private val syncCheckpointRecords = checkpointRecords - private var dateTimeLastSync: OffsetDateTime? = null - private var recordsLastSync: Long = 0 - private var recordsAllSyncs: Long = 0 - private var sendCheckpointMessage = false - - /** - * `checkpointOffsetToSend` is used as temporal storage for the offset that we want to send as - * message. As Debezium is reading records faster that we process them, if we try to send - * `offsetManger.read()` offset, it is possible that the state is behind the record we are - * currently propagating. To avoid that, we store the offset as soon as we reach the checkpoint - * threshold (time or records) and we wait to send it until we are sure that the record we are - * processing is behind the offset to be sent. - */ - private val checkpointOffsetToSend = HashMap() - - /** - * `previousCheckpointOffset` is used to make sure we don't send duplicated states with the same - * offset. Is it possible that the offset Debezium report doesn't move for a period of time, and - * if we just rely on the `offsetManger.read()`, there is a chance to sent duplicate states, - * generating an unneeded usage of networking and processing. - */ - private val initialOffset: HashMap - private val previousCheckpointOffset: HashMap? = - offsetManager.read() as HashMap - - /** - * @param changeEventIterator Base iterator that we want to enrich with checkpoint messages - * @param cdcStateHandler Handler to save the offset and schema history - * @param offsetManager Handler to read and write debezium offset file - * @param eventConverter Handler to transform debezium events into Airbyte messages. - * @param trackSchemaHistory Set true if the schema needs to be tracked - * @param schemaHistoryManager Handler to write schema. Needs to be initialized if - * trackSchemaHistory is set to true - * @param checkpointDuration Duration object with time between syncs - * @param checkpointRecords Number of records between syncs - */ - init { - this.initialOffset = HashMap(this.previousCheckpointOffset) - resetCheckpointValues() - } - - /** - * Computes the next record retrieved from Source stream. Emits state messages as checkpoints - * based on number of records or time lapsed. - * - * If this method throws an exception, it will propagate outward to the `hasNext` or `next` - * invocation that invoked this method. Any further attempts to use the iterator will result in - * an [IllegalStateException]. - * - * @return [AirbyteStateMessage] containing CDC data or state checkpoint message. - */ - override fun computeNext(): AirbyteMessage? { - if (isSyncFinished) { - return endOfData() - } - - if (cdcStateHandler.isCdcCheckpointEnabled && sendCheckpointMessage) { - LOGGER.info("Sending CDC checkpoint state message.") - val stateMessage = createStateMessage(checkpointOffsetToSend, recordsLastSync) - previousCheckpointOffset!!.clear() - previousCheckpointOffset.putAll(checkpointOffsetToSend) - resetCheckpointValues() - return stateMessage - } - - if (changeEventIterator.hasNext()) { - val event = changeEventIterator.next() - - if (cdcStateHandler.isCdcCheckpointEnabled) { - if ( - checkpointOffsetToSend.isEmpty() && - (recordsLastSync >= syncCheckpointRecords || - Duration.between(dateTimeLastSync, OffsetDateTime.now()) - .compareTo(syncCheckpointDuration) > 0) - ) { - // Using temporal variable to avoid reading teh offset twice, one in the - // condition and another in - // the assignation - try { - val temporalOffset = offsetManager!!.read() as HashMap - if ( - !targetPosition.isSameOffset(previousCheckpointOffset, temporalOffset) - ) { - checkpointOffsetToSend.putAll(temporalOffset) - } - } catch (e: ConnectException) { - LOGGER.warn( - "Offset file is being written by Debezium. Skipping CDC checkpoint in this loop." - ) - } - } - - if ( - checkpointOffsetToSend.size == 1 && - changeEventIterator.hasNext() && - !event.isSnapshotEvent - ) { - if (targetPosition.isEventAheadOffset(checkpointOffsetToSend, event)) { - sendCheckpointMessage = true - } else { - LOGGER.info( - "Encountered {} records with the same event offset", - recordsLastSync - ) - } - } - } - recordsLastSync++ - recordsAllSyncs++ - return eventConverter.toAirbyteMessage(event) - } - - isSyncFinished = true - val syncFinishedOffset = offsetManager!!.read() as HashMap - if ( - recordsAllSyncs == 0L && targetPosition.isSameOffset(initialOffset, syncFinishedOffset) - ) { - // Edge case where no progress has been made: wrap up the - // sync by returning the initial offset instead of the - // current offset. We do this because we found that - // for some databases, heartbeats will cause Debezium to - // overwrite the offset file with a state which doesn't - // include all necessary data such as snapshot completion. - // This is the case for MS SQL Server, at least. - return createStateMessage(initialOffset, 0) - } - return createStateMessage(syncFinishedOffset, recordsLastSync) - } - - /** Initialize or reset the checkpoint variables. */ - private fun resetCheckpointValues() { - sendCheckpointMessage = false - checkpointOffsetToSend.clear() - recordsLastSync = 0L - dateTimeLastSync = OffsetDateTime.now() - } - - /** - * Creates [AirbyteStateMessage] while updating CDC data, used to checkpoint the state of the - * process. - * - * @return [AirbyteStateMessage] which includes offset and schema history if used. - */ - private fun createStateMessage( - offset: Map?, - recordCount: Long - ): AirbyteMessage? { - if (trackSchemaHistory && schemaHistoryManager == null) { - throw RuntimeException("Schema History Tracking is true but manager is not initialised") - } - if (offsetManager == null) { - throw RuntimeException("Offset can not be null") - } - - val message = cdcStateHandler.saveState(offset, schemaHistoryManager?.read()) - message!!.state.withSourceStats(AirbyteStateStats().withRecordCount(recordCount.toDouble())) - return message - } - - companion object { - private val LOGGER: Logger = - LoggerFactory.getLogger(DebeziumStateDecoratingIterator::class.java) - } -} diff --git a/airbyte-cdk/java/airbyte-cdk/db-sources/src/main/kotlin/io/airbyte/cdk/integrations/source/relationaldb/state/CursorStateMessageProducer.kt b/airbyte-cdk/java/airbyte-cdk/db-sources/src/main/kotlin/io/airbyte/cdk/integrations/source/relationaldb/state/CursorStateMessageProducer.kt index d52305abb105d..3c8d4dc6f50d6 100644 --- a/airbyte-cdk/java/airbyte-cdk/db-sources/src/main/kotlin/io/airbyte/cdk/integrations/source/relationaldb/state/CursorStateMessageProducer.kt +++ b/airbyte-cdk/java/airbyte-cdk/db-sources/src/main/kotlin/io/airbyte/cdk/integrations/source/relationaldb/state/CursorStateMessageProducer.kt @@ -53,10 +53,10 @@ class CursorStateMessageProducer( * sync if we have fixed the underlying issue, of if the issue is transient. */ override fun processRecordMessage( - stream: ConfiguredAirbyteStream, + stream: ConfiguredAirbyteStream?, message: AirbyteMessage ): AirbyteMessage { - val cursorField = getCursorField(stream) + val cursorField = getCursorField(stream!!) if (message.record.data.hasNonNull(cursorField)) { val cursorCandidate = getCursorCandidate(cursorField, message) val cursorType = getCursorType(stream, cursorField) @@ -83,8 +83,8 @@ class CursorStateMessageProducer( return message } - override fun createFinalStateMessage(stream: ConfiguredAirbyteStream): AirbyteStateMessage? { - return createStateMessage(stream) + override fun createFinalStateMessage(stream: ConfiguredAirbyteStream?): AirbyteStateMessage? { + return createStateMessage(stream!!) } /** Only sends out state message when there is a state message to be sent out. */ diff --git a/airbyte-cdk/java/airbyte-cdk/db-sources/src/main/kotlin/io/airbyte/cdk/integrations/source/relationaldb/state/SourceStateIterator.kt b/airbyte-cdk/java/airbyte-cdk/db-sources/src/main/kotlin/io/airbyte/cdk/integrations/source/relationaldb/state/SourceStateIterator.kt index ef5758562854b..c4775a4074212 100644 --- a/airbyte-cdk/java/airbyte-cdk/db-sources/src/main/kotlin/io/airbyte/cdk/integrations/source/relationaldb/state/SourceStateIterator.kt +++ b/airbyte-cdk/java/airbyte-cdk/db-sources/src/main/kotlin/io/airbyte/cdk/integrations/source/relationaldb/state/SourceStateIterator.kt @@ -15,7 +15,7 @@ import org.slf4j.LoggerFactory open class SourceStateIterator( private val messageIterator: Iterator, - private val stream: ConfiguredAirbyteStream, + private val stream: ConfiguredAirbyteStream?, private val sourceStateMessageProducer: SourceStateMessageProducer, private val stateEmitFrequency: StateEmitFrequency ) : AbstractIterator(), MutableIterator { diff --git a/airbyte-cdk/java/airbyte-cdk/db-sources/src/main/kotlin/io/airbyte/cdk/integrations/source/relationaldb/state/SourceStateMessageProducer.kt b/airbyte-cdk/java/airbyte-cdk/db-sources/src/main/kotlin/io/airbyte/cdk/integrations/source/relationaldb/state/SourceStateMessageProducer.kt index 4c70a0b0a2b7b..7c2fd5bc7c44b 100644 --- a/airbyte-cdk/java/airbyte-cdk/db-sources/src/main/kotlin/io/airbyte/cdk/integrations/source/relationaldb/state/SourceStateMessageProducer.kt +++ b/airbyte-cdk/java/airbyte-cdk/db-sources/src/main/kotlin/io/airbyte/cdk/integrations/source/relationaldb/state/SourceStateMessageProducer.kt @@ -19,7 +19,7 @@ interface SourceStateMessageProducer { fun generateStateMessageAtCheckpoint(stream: ConfiguredAirbyteStream?): AirbyteStateMessage? /** For the incoming record message, this method defines how the connector will consume it. */ - fun processRecordMessage(stream: ConfiguredAirbyteStream, message: T): AirbyteMessage + fun processRecordMessage(stream: ConfiguredAirbyteStream?, message: T): AirbyteMessage /** * At the end of the iteration, this method will be called and it will generate the final state @@ -27,7 +27,7 @@ interface SourceStateMessageProducer { * * @return */ - fun createFinalStateMessage(stream: ConfiguredAirbyteStream): AirbyteStateMessage? + fun createFinalStateMessage(stream: ConfiguredAirbyteStream?): AirbyteStateMessage? /** * Determines if the iterator has reached checkpoint or not per connector's definition. By diff --git a/airbyte-cdk/java/airbyte-cdk/db-sources/src/test/java/io/airbyte/cdk/integrations/debezium/internals/DebeziumMessageProducerTest.java b/airbyte-cdk/java/airbyte-cdk/db-sources/src/test/java/io/airbyte/cdk/integrations/debezium/internals/DebeziumMessageProducerTest.java deleted file mode 100644 index 5a9a4b9a9f845..0000000000000 --- a/airbyte-cdk/java/airbyte-cdk/db-sources/src/test/java/io/airbyte/cdk/integrations/debezium/internals/DebeziumMessageProducerTest.java +++ /dev/null @@ -1,108 +0,0 @@ -/* - * Copyright (c) 2024 Airbyte, Inc., all rights reserved. - */ - -package io.airbyte.cdk.integrations.debezium.internals; - -import static org.junit.Assert.assertEquals; -import static org.junit.Assert.assertFalse; -import static org.junit.Assert.assertTrue; -import static org.mockito.ArgumentMatchers.any; -import static org.mockito.ArgumentMatchers.eq; -import static org.mockito.Mockito.mock; -import static org.mockito.Mockito.verify; -import static org.mockito.Mockito.when; - -import io.airbyte.cdk.integrations.debezium.CdcStateHandler; -import io.airbyte.cdk.integrations.debezium.CdcTargetPosition; -import io.airbyte.protocol.models.v0.AirbyteMessage; -import io.airbyte.protocol.models.v0.AirbyteStateMessage; -import io.airbyte.protocol.models.v0.AirbyteStateMessage.AirbyteStateType; -import java.util.HashMap; -import java.util.Map; -import java.util.Optional; -import org.junit.jupiter.api.BeforeEach; -import org.junit.jupiter.api.Test; - -public class DebeziumMessageProducerTest { - - private DebeziumMessageProducer producer; - - CdcStateHandler cdcStateHandler; - CdcTargetPosition targetPosition; - DebeziumEventConverter eventConverter; - AirbyteFileOffsetBackingStore offsetManager; - AirbyteSchemaHistoryStorage schemaHistoryManager; - - private static Map OFFSET_MANAGER_READ = new HashMap<>(Map.of("key", "value")); - private static Map OFFSET_MANAGER_READ2 = new HashMap<>(Map.of("key2", "value2")); - - private static AirbyteSchemaHistoryStorage.SchemaHistory SCHEMA = new AirbyteSchemaHistoryStorage.SchemaHistory("schema", false); - - private static AirbyteStateMessage STATE_MESSAGE = new AirbyteStateMessage().withType(AirbyteStateType.GLOBAL); - - @BeforeEach - void setUp() { - cdcStateHandler = mock(CdcStateHandler.class); - when(cdcStateHandler.isCdcCheckpointEnabled()).thenReturn(true); - targetPosition = mock(CdcTargetPosition.class); - eventConverter = mock(DebeziumEventConverter.class); - offsetManager = mock(AirbyteFileOffsetBackingStore.class); - when(offsetManager.read()).thenReturn(OFFSET_MANAGER_READ); - schemaHistoryManager = mock(AirbyteSchemaHistoryStorage.class); - when(schemaHistoryManager.read()).thenReturn(SCHEMA); - producer = new DebeziumMessageProducer(cdcStateHandler, targetPosition, eventConverter, offsetManager, Optional.of(schemaHistoryManager)); - } - - @Test - void testProcessRecordMessage() { - ChangeEventWithMetadata message = mock(ChangeEventWithMetadata.class); - - when(targetPosition.isSameOffset(any(), any())).thenReturn(true); - producer.processRecordMessage(null, message); - verify(eventConverter).toAirbyteMessage(message); - assertFalse(producer.shouldEmitStateMessage(null)); - } - - @Test - void testProcessRecordMessageWithStateMessage() { - ChangeEventWithMetadata message = mock(ChangeEventWithMetadata.class); - - when(targetPosition.isSameOffset(any(), any())).thenReturn(false); - when(targetPosition.isEventAheadOffset(OFFSET_MANAGER_READ, message)).thenReturn(true); - producer.processRecordMessage(null, message); - verify(eventConverter).toAirbyteMessage(message); - assertTrue(producer.shouldEmitStateMessage(null)); - - when(cdcStateHandler.isCdcCheckpointEnabled()).thenReturn(false); - when(cdcStateHandler.saveState(eq(OFFSET_MANAGER_READ), eq(SCHEMA))).thenReturn(new AirbyteMessage().withState(STATE_MESSAGE)); - - assertEquals(producer.generateStateMessageAtCheckpoint(null), STATE_MESSAGE); - } - - @Test - void testGenerateFinalMessageNoProgress() { - when(cdcStateHandler.saveState(eq(OFFSET_MANAGER_READ), eq(SCHEMA))).thenReturn(new AirbyteMessage().withState(STATE_MESSAGE)); - - // initialOffset will be OFFSET_MANAGER_READ, final state would be OFFSET_MANAGER_READ2. - // Mock CDC handler will only accept OFFSET_MANAGER_READ. - when(offsetManager.read()).thenReturn(OFFSET_MANAGER_READ2); - - when(targetPosition.isSameOffset(OFFSET_MANAGER_READ, OFFSET_MANAGER_READ2)).thenReturn(true); - - assertEquals(producer.createFinalStateMessage(null), STATE_MESSAGE); - } - - @Test - void testGenerateFinalMessageWithProgress() { - when(cdcStateHandler.saveState(eq(OFFSET_MANAGER_READ2), eq(SCHEMA))).thenReturn(new AirbyteMessage().withState(STATE_MESSAGE)); - - // initialOffset will be OFFSET_MANAGER_READ, final state would be OFFSET_MANAGER_READ2. - // Mock CDC handler will only accept OFFSET_MANAGER_READ2. - when(offsetManager.read()).thenReturn(OFFSET_MANAGER_READ2); - when(targetPosition.isSameOffset(OFFSET_MANAGER_READ, OFFSET_MANAGER_READ2)).thenReturn(false); - - assertEquals(producer.createFinalStateMessage(null), STATE_MESSAGE); - } - -} diff --git a/airbyte-cdk/java/airbyte-cdk/db-sources/src/test/kotlin/io/airbyte/cdk/integrations/debezium/internals/DebeziumMessageProducerTest.kt b/airbyte-cdk/java/airbyte-cdk/db-sources/src/test/kotlin/io/airbyte/cdk/integrations/debezium/internals/DebeziumMessageProducerTest.kt new file mode 100644 index 0000000000000..26ddf93031dc1 --- /dev/null +++ b/airbyte-cdk/java/airbyte-cdk/db-sources/src/test/kotlin/io/airbyte/cdk/integrations/debezium/internals/DebeziumMessageProducerTest.kt @@ -0,0 +1,100 @@ +/* + * Copyright (c) 2024 Airbyte, Inc., all rights reserved. + */ +package io.airbyte.cdk.integrations.debezium.internals + +import io.airbyte.cdk.integrations.debezium.CdcStateHandler +import io.airbyte.cdk.integrations.debezium.CdcTargetPosition +import io.airbyte.protocol.models.v0.AirbyteMessage +import io.airbyte.protocol.models.v0.AirbyteStateMessage +import org.junit.Assert +import org.junit.jupiter.api.BeforeEach +import org.junit.jupiter.api.Test +import org.mockito.ArgumentMatchers +import org.mockito.Mockito +import org.mockito.kotlin.any +import org.mockito.kotlin.mock +import java.util.* + +class DebeziumMessageProducerTest { + private var producer: DebeziumMessageProducer<*>? = null + + lateinit var cdcStateHandler: CdcStateHandler + lateinit var targetPosition: CdcTargetPosition + lateinit var eventConverter: DebeziumEventConverter + lateinit var offsetManager: AirbyteFileOffsetBackingStore + lateinit var schemaHistoryManager: AirbyteSchemaHistoryStorage + + @BeforeEach + fun setUp() { + cdcStateHandler = Mockito.mock(CdcStateHandler::class.java) + Mockito.`when`(cdcStateHandler.isCdcCheckpointEnabled).thenReturn(true) + targetPosition = mock() + eventConverter = Mockito.mock(DebeziumEventConverter::class.java) + offsetManager = Mockito.mock(AirbyteFileOffsetBackingStore::class.java) + Mockito.`when`>(offsetManager.read()).thenReturn(OFFSET_MANAGER_READ) + schemaHistoryManager = Mockito.mock(AirbyteSchemaHistoryStorage::class.java) + Mockito.`when`(schemaHistoryManager.read()).thenReturn(SCHEMA) + producer = DebeziumMessageProducer(cdcStateHandler, targetPosition, eventConverter!!, offsetManager, Optional.of(schemaHistoryManager)) + } + + @Test + fun testProcessRecordMessage() { + val message = Mockito.mock(ChangeEventWithMetadata::class.java) + + Mockito.`when`(targetPosition!!.isSameOffset(any(), any())).thenReturn(true) + producer!!.processRecordMessage(null, message) + Mockito.verify(eventConverter).toAirbyteMessage(message) + Assert.assertFalse(producer!!.shouldEmitStateMessage(null)) + } + + @Test + fun testProcessRecordMessageWithStateMessage() { + val message = Mockito.mock(ChangeEventWithMetadata::class.java) + + Mockito.`when`(targetPosition!!.isSameOffset(ArgumentMatchers.any>(), any())).thenReturn(false) + Mockito.`when`(targetPosition!!.isEventAheadOffset(OFFSET_MANAGER_READ, message)).thenReturn(true) + producer!!.processRecordMessage(null, message) + Mockito.verify(eventConverter!!).toAirbyteMessage(message) + Assert.assertTrue(producer!!.shouldEmitStateMessage(null)) + + Mockito.`when`(cdcStateHandler!!.isCdcCheckpointEnabled).thenReturn(false) + Mockito.`when`(cdcStateHandler!!.saveState(ArgumentMatchers.eq(OFFSET_MANAGER_READ), ArgumentMatchers.eq(SCHEMA))).thenReturn(AirbyteMessage().withState(STATE_MESSAGE)) + + Assert.assertEquals(producer!!.generateStateMessageAtCheckpoint(null), STATE_MESSAGE) + } + + @Test + fun testGenerateFinalMessageNoProgress() { + Mockito.`when`(cdcStateHandler!!.saveState(ArgumentMatchers.eq(OFFSET_MANAGER_READ), ArgumentMatchers.eq(SCHEMA))).thenReturn(AirbyteMessage().withState(STATE_MESSAGE)) + + // initialOffset will be OFFSET_MANAGER_READ, final state would be OFFSET_MANAGER_READ2. + // Mock CDC handler will only accept OFFSET_MANAGER_READ. + Mockito.`when`>(offsetManager!!.read()).thenReturn(OFFSET_MANAGER_READ2) + + Mockito.`when`(targetPosition!!.isSameOffset(OFFSET_MANAGER_READ, OFFSET_MANAGER_READ2)).thenReturn(true) + + Assert.assertEquals(producer!!.createFinalStateMessage(null), STATE_MESSAGE) + } + + @Test + fun testGenerateFinalMessageWithProgress() { + Mockito.`when`(cdcStateHandler!!.saveState(ArgumentMatchers.eq(OFFSET_MANAGER_READ2), ArgumentMatchers.eq(SCHEMA))).thenReturn(AirbyteMessage().withState(STATE_MESSAGE)) + + // initialOffset will be OFFSET_MANAGER_READ, final state would be OFFSET_MANAGER_READ2. + // Mock CDC handler will only accept OFFSET_MANAGER_READ2. + Mockito.`when`>(offsetManager!!.read()).thenReturn(OFFSET_MANAGER_READ2) + Mockito.`when`(targetPosition!!.isSameOffset(OFFSET_MANAGER_READ, OFFSET_MANAGER_READ2)).thenReturn(false) + + Assert.assertEquals(producer!!.createFinalStateMessage(null), STATE_MESSAGE) + } + + companion object { + private val OFFSET_MANAGER_READ: Map = HashMap(java.util.Map.of("key", "value")) + private val OFFSET_MANAGER_READ2: Map = HashMap(java.util.Map.of("key2", "value2")) + + private val SCHEMA: AirbyteSchemaHistoryStorage.SchemaHistory = AirbyteSchemaHistoryStorage.SchemaHistory("schema", false) + + private val STATE_MESSAGE: AirbyteStateMessage = AirbyteStateMessage().withType(AirbyteStateMessage.AirbyteStateType.GLOBAL) + } +}