@@ -14,34 +14,43 @@ import io.airbyte.protocol.models.v0.ConfiguredAirbyteStream
14
14
import io.airbyte.protocol.models.v0.SyncMode
15
15
import io.debezium.engine.ChangeEvent
16
16
import io.debezium.engine.DebeziumEngine
17
- import org.slf4j.Logger
18
- import org.slf4j.LoggerFactory
19
17
import java.time.Duration
20
18
import java.time.Instant
21
19
import java.time.temporal.ChronoUnit
22
20
import java.util.*
23
21
import java.util.concurrent.LinkedBlockingQueue
22
+ import org.slf4j.Logger
23
+ import org.slf4j.LoggerFactory
24
24
25
25
/* *
26
26
* This class acts as the bridge between Airbyte DB connectors and debezium. If a DB connector wants
27
27
* to use debezium for CDC, it should use this class
28
28
*/
29
- class AirbyteDebeziumHandler <T >(private val config : JsonNode ,
30
- private val targetPosition : CdcTargetPosition <T >,
31
- private val trackSchemaHistory : Boolean ,
32
- private val firstRecordWaitTime : Duration ,
33
- private val subsequentRecordWaitTime : Duration ,
34
- private val queueSize : Int ,
35
- private val addDbNameToOffsetState : Boolean ) {
36
- internal inner class CapacityReportingBlockingQueue <E >(capacity : Int ) : LinkedBlockingQueue<E>(capacity) {
29
+ class AirbyteDebeziumHandler <T >(
30
+ private val config : JsonNode ,
31
+ private val targetPosition : CdcTargetPosition <T >,
32
+ private val trackSchemaHistory : Boolean ,
33
+ private val firstRecordWaitTime : Duration ,
34
+ private val subsequentRecordWaitTime : Duration ,
35
+ private val queueSize : Int ,
36
+ private val addDbNameToOffsetState : Boolean
37
+ ) {
38
+ internal inner class CapacityReportingBlockingQueue <E >(capacity : Int ) :
39
+ LinkedBlockingQueue <E >(capacity) {
37
40
private var lastReport: Instant ? = null
38
41
39
42
private fun reportQueueUtilization () {
40
- if (lastReport == null || Duration .between(lastReport, Instant .now()).compareTo(Companion .REPORT_DURATION ) > 0 ) {
41
- LOGGER .info(" CDC events queue size: {}. remaining {}" , this .size, this .remainingCapacity())
42
- synchronized(this ) {
43
- lastReport = Instant .now()
44
- }
43
+ if (
44
+ lastReport == null ||
45
+ Duration .between(lastReport, Instant .now())
46
+ .compareTo(Companion .REPORT_DURATION ) > 0
47
+ ) {
48
+ LOGGER .info(
49
+ " CDC events queue size: {}. remaining {}" ,
50
+ this .size,
51
+ this .remainingCapacity()
52
+ )
53
+ synchronized(this ) { lastReport = Instant .now() }
45
54
}
46
55
}
47
56
@@ -55,44 +64,62 @@ class AirbyteDebeziumHandler<T>(private val config: JsonNode,
55
64
reportQueueUtilization()
56
65
return super .poll()
57
66
}
58
-
59
- companion object {
60
- private val REPORT_DURATION : Duration = Duration .of(10 , ChronoUnit .SECONDS )
61
- }
62
67
}
63
68
64
- fun getIncrementalIterators (debeziumPropertiesManager : DebeziumPropertiesManager ,
65
- eventConverter : DebeziumEventConverter ,
66
- cdcSavedInfoFetcher : CdcSavedInfoFetcher ,
67
- cdcStateHandler : CdcStateHandler ): AutoCloseableIterator <AirbyteMessage > {
69
+ fun getIncrementalIterators (
70
+ debeziumPropertiesManager : DebeziumPropertiesManager ,
71
+ eventConverter : DebeziumEventConverter ,
72
+ cdcSavedInfoFetcher : CdcSavedInfoFetcher ,
73
+ cdcStateHandler : CdcStateHandler
74
+ ): AutoCloseableIterator <AirbyteMessage > {
68
75
LOGGER .info(" Using CDC: {}" , true )
69
- LOGGER .info(" Using DBZ version: {}" , DebeziumEngine ::class .java.getPackage().implementationVersion)
70
- val offsetManager: AirbyteFileOffsetBackingStore = AirbyteFileOffsetBackingStore .Companion .initializeState(
76
+ LOGGER .info(
77
+ " Using DBZ version: {}" ,
78
+ DebeziumEngine ::class .java.getPackage().implementationVersion
79
+ )
80
+ val offsetManager: AirbyteFileOffsetBackingStore =
81
+ AirbyteFileOffsetBackingStore .Companion .initializeState(
71
82
cdcSavedInfoFetcher.savedOffset,
72
- if (addDbNameToOffsetState) Optional .ofNullable<String >(config[JdbcUtils .DATABASE_KEY ].asText()) else Optional .empty<String >())
73
- val schemaHistoryManager: Optional <AirbyteSchemaHistoryStorage ?> = if (trackSchemaHistory
74
- ) Optional .of<AirbyteSchemaHistoryStorage ?>(AirbyteSchemaHistoryStorage .Companion .initializeDBHistory(
75
- cdcSavedInfoFetcher.savedSchemaHistory, cdcStateHandler.compressSchemaHistoryForState()))
76
- else Optional .empty<AirbyteSchemaHistoryStorage >()
83
+ if (addDbNameToOffsetState)
84
+ Optional .ofNullable<String >(config[JdbcUtils .DATABASE_KEY ].asText())
85
+ else Optional .empty<String >()
86
+ )
87
+ val schemaHistoryManager: Optional <AirbyteSchemaHistoryStorage > =
88
+ if (trackSchemaHistory)
89
+ Optional .of<AirbyteSchemaHistoryStorage ?>(
90
+ AirbyteSchemaHistoryStorage .Companion .initializeDBHistory(
91
+ cdcSavedInfoFetcher.savedSchemaHistory,
92
+ cdcStateHandler.compressSchemaHistoryForState()
93
+ )
94
+ )
95
+ else Optional .empty<AirbyteSchemaHistoryStorage >()
77
96
val publisher = DebeziumRecordPublisher (debeziumPropertiesManager)
78
- val queue: CapacityReportingBlockingQueue <ChangeEvent <String ?, String ?>> = CapacityReportingBlockingQueue <ChangeEvent <String , String >>(queueSize)
97
+ val queue: CapacityReportingBlockingQueue <ChangeEvent <String ?, String ?>> =
98
+ CapacityReportingBlockingQueue (queueSize)
79
99
publisher.start(queue, offsetManager, schemaHistoryManager)
80
100
// handle state machine around pub/sub logic.
81
- val eventIterator: AutoCloseableIterator <ChangeEventWithMetadata > = DebeziumRecordIterator (
101
+ val eventIterator: AutoCloseableIterator <ChangeEventWithMetadata > =
102
+ DebeziumRecordIterator (
82
103
queue,
83
104
targetPosition,
84
105
{ publisher.hasClosed() },
85
106
DebeziumShutdownProcedure (queue, { publisher.close() }, { publisher.hasClosed() }),
86
107
firstRecordWaitTime,
87
- subsequentRecordWaitTime)
108
+ subsequentRecordWaitTime
109
+ )
88
110
89
- val syncCheckpointDuration = if (config.has(DebeziumIteratorConstants .SYNC_CHECKPOINT_DURATION_PROPERTY )
90
- ) Duration .ofSeconds(config[DebeziumIteratorConstants .SYNC_CHECKPOINT_DURATION_PROPERTY ].asLong())
91
- else DebeziumIteratorConstants .SYNC_CHECKPOINT_DURATION
92
- val syncCheckpointRecords = if (config.has(DebeziumIteratorConstants .SYNC_CHECKPOINT_RECORDS_PROPERTY )
93
- ) config[DebeziumIteratorConstants .SYNC_CHECKPOINT_RECORDS_PROPERTY ].asLong()
94
- else DebeziumIteratorConstants .SYNC_CHECKPOINT_RECORDS .toLong()
95
- return AutoCloseableIterators .fromIterator(DebeziumStateDecoratingIterator (
111
+ val syncCheckpointDuration =
112
+ if (config.has(DebeziumIteratorConstants .SYNC_CHECKPOINT_DURATION_PROPERTY ))
113
+ Duration .ofSeconds(
114
+ config[DebeziumIteratorConstants .SYNC_CHECKPOINT_DURATION_PROPERTY ].asLong()
115
+ )
116
+ else DebeziumIteratorConstants .SYNC_CHECKPOINT_DURATION
117
+ val syncCheckpointRecords =
118
+ if (config.has(DebeziumIteratorConstants .SYNC_CHECKPOINT_RECORDS_PROPERTY ))
119
+ config[DebeziumIteratorConstants .SYNC_CHECKPOINT_RECORDS_PROPERTY ].asLong()
120
+ else DebeziumIteratorConstants .SYNC_CHECKPOINT_RECORDS .toLong()
121
+ return AutoCloseableIterators .fromIterator(
122
+ DebeziumStateDecoratingIterator (
96
123
eventIterator,
97
124
cdcStateHandler,
98
125
targetPosition,
@@ -101,11 +128,14 @@ class AirbyteDebeziumHandler<T>(private val config: JsonNode,
101
128
trackSchemaHistory,
102
129
schemaHistoryManager.orElse(null ),
103
130
syncCheckpointDuration,
104
- syncCheckpointRecords))
131
+ syncCheckpointRecords
132
+ )
133
+ )
105
134
}
106
135
107
136
companion object {
108
137
private val LOGGER : Logger = LoggerFactory .getLogger(AirbyteDebeziumHandler ::class .java)
138
+ private val REPORT_DURATION : Duration = Duration .of(10 , ChronoUnit .SECONDS )
109
139
110
140
/* *
111
141
* We use 10000 as capacity cause the default queue size and batch size of debezium is :
@@ -115,8 +145,10 @@ class AirbyteDebeziumHandler<T>(private val config: JsonNode,
115
145
const val QUEUE_CAPACITY : Int = 10000
116
146
117
147
fun isAnyStreamIncrementalSyncMode (catalog : ConfiguredAirbyteCatalog ): Boolean {
118
- return catalog.streams.stream().map { obj: ConfiguredAirbyteStream -> obj.syncMode }
119
- .anyMatch { syncMode: SyncMode -> syncMode == SyncMode .INCREMENTAL }
148
+ return catalog.streams
149
+ .stream()
150
+ .map { obj: ConfiguredAirbyteStream -> obj.syncMode }
151
+ .anyMatch { syncMode: SyncMode -> syncMode == SyncMode .INCREMENTAL }
120
152
}
121
153
}
122
154
}
0 commit comments