|
1 |
| -/* |
2 |
| - * Copyright (c) 2023 Airbyte, Inc., all rights reserved. |
3 |
| - */ |
4 |
| - |
5 |
| -package io.airbyte.cdk.integrations.debezium; |
6 |
| - |
7 |
| -import static io.airbyte.cdk.integrations.debezium.DebeziumIteratorConstants.*; |
8 |
| - |
9 |
| -import com.fasterxml.jackson.databind.JsonNode; |
10 |
| -import io.airbyte.cdk.db.jdbc.JdbcUtils; |
11 |
| -import io.airbyte.cdk.integrations.debezium.internals.*; |
12 |
| -import io.airbyte.cdk.integrations.source.relationaldb.state.SourceStateIterator; |
13 |
| -import io.airbyte.cdk.integrations.source.relationaldb.state.StateEmitFrequency; |
14 |
| -import io.airbyte.commons.util.AutoCloseableIterator; |
15 |
| -import io.airbyte.commons.util.AutoCloseableIterators; |
16 |
| -import io.airbyte.protocol.models.v0.AirbyteMessage; |
17 |
| -import io.airbyte.protocol.models.v0.ConfiguredAirbyteCatalog; |
18 |
| -import io.airbyte.protocol.models.v0.ConfiguredAirbyteStream; |
19 |
| -import io.airbyte.protocol.models.v0.SyncMode; |
20 |
| -import io.debezium.engine.ChangeEvent; |
21 |
| -import io.debezium.engine.DebeziumEngine; |
22 |
| -import java.time.Duration; |
23 |
| -import java.time.Instant; |
24 |
| -import java.time.temporal.ChronoUnit; |
25 |
| -import java.util.Optional; |
26 |
| -import java.util.concurrent.LinkedBlockingQueue; |
27 |
| -import org.slf4j.Logger; |
28 |
| -import org.slf4j.LoggerFactory; |
29 |
| - |
30 |
| -/** |
31 |
| - * This class acts as the bridge between Airbyte DB connectors and debezium. If a DB connector wants |
32 |
| - * to use debezium for CDC, it should use this class |
33 |
| - */ |
34 |
| -public class AirbyteDebeziumHandler<T> { |
35 |
| - |
36 |
| - private static final Logger LOGGER = LoggerFactory.getLogger(AirbyteDebeziumHandler.class); |
37 |
| - /** |
38 |
| - * We use 10000 as capacity cause the default queue size and batch size of debezium is : |
39 |
| - * {@link io.debezium.config.CommonConnectorConfig#DEFAULT_MAX_BATCH_SIZE}is 2048 |
40 |
| - * {@link io.debezium.config.CommonConnectorConfig#DEFAULT_MAX_QUEUE_SIZE} is 8192 |
41 |
| - */ |
42 |
| - public static final int QUEUE_CAPACITY = 10_000; |
43 |
| - |
44 |
| - private final JsonNode config; |
45 |
| - private final CdcTargetPosition<T> targetPosition; |
46 |
| - private final boolean trackSchemaHistory; |
47 |
| - private final Duration firstRecordWaitTime, subsequentRecordWaitTime; |
48 |
| - private final int queueSize; |
49 |
| - private final boolean addDbNameToOffsetState; |
50 |
| - |
51 |
| - public AirbyteDebeziumHandler(final JsonNode config, |
52 |
| - final CdcTargetPosition<T> targetPosition, |
53 |
| - final boolean trackSchemaHistory, |
54 |
| - final Duration firstRecordWaitTime, |
55 |
| - final Duration subsequentRecordWaitTime, |
56 |
| - final int queueSize, |
57 |
| - final boolean addDbNameToOffsetState) { |
58 |
| - this.config = config; |
59 |
| - this.targetPosition = targetPosition; |
60 |
| - this.trackSchemaHistory = trackSchemaHistory; |
61 |
| - this.firstRecordWaitTime = firstRecordWaitTime; |
62 |
| - this.subsequentRecordWaitTime = subsequentRecordWaitTime; |
63 |
| - this.queueSize = queueSize; |
64 |
| - this.addDbNameToOffsetState = addDbNameToOffsetState; |
65 |
| - } |
66 |
| - |
67 |
| - class CapacityReportingBlockingQueue<E> extends LinkedBlockingQueue<E> { |
68 |
| - |
69 |
| - private static Duration REPORT_DURATION = Duration.of(10, ChronoUnit.SECONDS); |
70 |
| - private Instant lastReport; |
71 |
| - |
72 |
| - CapacityReportingBlockingQueue(final int capacity) { |
73 |
| - super(capacity); |
74 |
| - } |
75 |
| - |
76 |
| - private void reportQueueUtilization() { |
77 |
| - if (lastReport == null || Duration.between(lastReport, Instant.now()).compareTo(REPORT_DURATION) > 0) { |
78 |
| - LOGGER.info("CDC events queue size: {}. remaining {}", this.size(), this.remainingCapacity()); |
79 |
| - synchronized (this) { |
80 |
| - lastReport = Instant.now(); |
81 |
| - } |
82 |
| - } |
83 |
| - } |
84 |
| - |
85 |
| - @Override |
86 |
| - public void put(final E e) throws InterruptedException { |
87 |
| - reportQueueUtilization(); |
88 |
| - super.put(e); |
89 |
| - } |
90 |
| - |
91 |
| - @Override |
92 |
| - public E poll() { |
93 |
| - reportQueueUtilization(); |
94 |
| - return super.poll(); |
95 |
| - } |
96 |
| - |
97 |
| - } |
98 |
| - |
99 |
| - public AutoCloseableIterator<AirbyteMessage> getIncrementalIterators(final DebeziumPropertiesManager debeziumPropertiesManager, |
100 |
| - final DebeziumEventConverter eventConverter, |
101 |
| - final CdcSavedInfoFetcher cdcSavedInfoFetcher, |
102 |
| - final CdcStateHandler cdcStateHandler) { |
103 |
| - LOGGER.info("Using CDC: {}", true); |
104 |
| - LOGGER.info("Using DBZ version: {}", DebeziumEngine.class.getPackage().getImplementationVersion()); |
105 |
| - final AirbyteFileOffsetBackingStore offsetManager = AirbyteFileOffsetBackingStore.initializeState( |
106 |
| - cdcSavedInfoFetcher.getSavedOffset(), |
107 |
| - addDbNameToOffsetState ? Optional.ofNullable(config.get(JdbcUtils.DATABASE_KEY).asText()) : Optional.empty()); |
108 |
| - final var schemaHistoryManager = trackSchemaHistory |
109 |
| - ? Optional.of(AirbyteSchemaHistoryStorage.initializeDBHistory( |
110 |
| - cdcSavedInfoFetcher.getSavedSchemaHistory(), cdcStateHandler.compressSchemaHistoryForState())) |
111 |
| - : Optional.<AirbyteSchemaHistoryStorage>empty(); |
112 |
| - final var publisher = new DebeziumRecordPublisher(debeziumPropertiesManager); |
113 |
| - final var queue = new CapacityReportingBlockingQueue<ChangeEvent<String, String>>(queueSize); |
114 |
| - publisher.start(queue, offsetManager, schemaHistoryManager); |
115 |
| - // handle state machine around pub/sub logic. |
116 |
| - final AutoCloseableIterator<ChangeEventWithMetadata> eventIterator = new DebeziumRecordIterator<>( |
117 |
| - queue, |
118 |
| - targetPosition, |
119 |
| - publisher::hasClosed, |
120 |
| - new DebeziumShutdownProcedure<>(queue, publisher::close, publisher::hasClosed), |
121 |
| - firstRecordWaitTime, |
122 |
| - subsequentRecordWaitTime); |
123 |
| - |
124 |
| - final Duration syncCheckpointDuration = config.has(SYNC_CHECKPOINT_DURATION_PROPERTY) |
125 |
| - ? Duration.ofSeconds(config.get(SYNC_CHECKPOINT_DURATION_PROPERTY).asLong()) |
126 |
| - : SYNC_CHECKPOINT_DURATION; |
127 |
| - final Long syncCheckpointRecords = config.has(SYNC_CHECKPOINT_RECORDS_PROPERTY) |
128 |
| - ? config.get(SYNC_CHECKPOINT_RECORDS_PROPERTY).asLong() |
129 |
| - : SYNC_CHECKPOINT_RECORDS; |
130 |
| - |
131 |
| - DebeziumMessageProducer messageProducer = new DebeziumMessageProducer(cdcStateHandler, |
132 |
| - targetPosition, |
133 |
| - eventConverter, |
134 |
| - offsetManager, |
135 |
| - schemaHistoryManager); |
136 |
| - |
137 |
| - // Usually sourceStateIterator requires airbyteStream as input. For DBZ iterator, stream is not used |
138 |
| - // at all thus we will pass in null. |
139 |
| - SourceStateIterator iterator = |
140 |
| - new SourceStateIterator<>(eventIterator, null, messageProducer, new StateEmitFrequency(syncCheckpointRecords, syncCheckpointDuration)); |
141 |
| - return AutoCloseableIterators.fromIterator(iterator); |
142 |
| - } |
143 |
| - |
144 |
| - public static boolean isAnyStreamIncrementalSyncMode(final ConfiguredAirbyteCatalog catalog) { |
145 |
| - return catalog.getStreams().stream().map(ConfiguredAirbyteStream::getSyncMode) |
146 |
| - .anyMatch(syncMode -> syncMode == SyncMode.INCREMENTAL); |
147 |
| - } |
148 |
| - |
149 |
| -} |
0 commit comments