24
24
25
25
package io .airbyte .integrations .debezium .internals ;
26
26
27
+ import com .fasterxml .jackson .databind .JsonNode ;
27
28
import com .google .common .collect .AbstractIterator ;
28
29
import io .airbyte .commons .concurrency .VoidCallable ;
29
30
import io .airbyte .commons .json .Jsons ;
@@ -54,13 +55,14 @@ public class DebeziumRecordIterator extends AbstractIterator<ChangeEvent<String,
54
55
private static final Logger LOGGER = LoggerFactory .getLogger (DebeziumRecordIterator .class );
55
56
56
57
private static final WaitTime FIRST_RECORD_WAIT_TIME_MINUTES = new WaitTime (5 , TimeUnit .MINUTES );
57
- private static final WaitTime SUBSEQUENT_RECORD_WAIT_TIME_SECONDS = new WaitTime (5 , TimeUnit .SECONDS );
58
+ private static final WaitTime SUBSEQUENT_RECORD_WAIT_TIME_SECONDS = new WaitTime (1 , TimeUnit .MINUTES );
58
59
59
60
private final LinkedBlockingQueue <ChangeEvent <String , String >> queue ;
60
61
private final CdcTargetPosition targetPosition ;
61
62
private final Supplier <Boolean > publisherStatusSupplier ;
62
63
private final VoidCallable requestClose ;
63
64
private boolean receivedFirstRecord ;
65
+ private boolean hasSnapshotFinished ;
64
66
65
67
public DebeziumRecordIterator (LinkedBlockingQueue <ChangeEvent <String , String >> queue ,
66
68
CdcTargetPosition targetPosition ,
@@ -71,6 +73,7 @@ public DebeziumRecordIterator(LinkedBlockingQueue<ChangeEvent<String, String>> q
71
73
this .publisherStatusSupplier = publisherStatusSupplier ;
72
74
this .requestClose = requestClose ;
73
75
this .receivedFirstRecord = false ;
76
+ this .hasSnapshotFinished = true ;
74
77
}
75
78
76
79
@ Override
@@ -90,13 +93,17 @@ protected ChangeEvent<String, String> computeNext() {
90
93
// if within the timeout, the consumer could not get a record, it is time to tell the producer to
91
94
// shutdown.
92
95
if (next == null ) {
96
+ LOGGER .info ("Closing cause next is returned as null" );
93
97
requestClose ();
94
98
LOGGER .info ("no record found. polling again." );
95
99
continue ;
96
100
}
97
101
102
+ JsonNode eventAsJson = Jsons .deserialize (next .value ());
103
+ hasSnapshotFinished = hasSnapshotFinished (eventAsJson );
104
+
98
105
// if the last record matches the target file position, it is time to tell the producer to shutdown.
99
- if (shouldSignalClose (next )) {
106
+ if (shouldSignalClose (eventAsJson )) {
100
107
requestClose ();
101
108
}
102
109
receivedFirstRecord = true ;
@@ -105,14 +112,35 @@ protected ChangeEvent<String, String> computeNext() {
105
112
return endOfData ();
106
113
}
107
114
115
+ private boolean hasSnapshotFinished (JsonNode eventAsJson ) {
116
+ SnapshotMetadata snapshot = SnapshotMetadata .valueOf (eventAsJson .get ("source" ).get ("snapshot" ).asText ().toUpperCase ());
117
+ return SnapshotMetadata .TRUE != snapshot ;
118
+ }
119
+
120
+ /**
121
+ * Debezium was built as an ever running process which keeps on listening for new changes on DB and
122
+ * immediately processing them. Airbyte needs debezium to work as a start stop mechanism. In order
123
+ * to determine when to stop debezium engine we rely on few factors 1. TargetPosition logic. At the
124
+ * beginning of the sync we define a target position in the logs of the DB. This can be an LSN or
125
+ * anything specific to the DB which can help us identify that we have reached a specific position
126
+ * in the log based replication When we start processing records from debezium, we extract the the
127
+ * log position from the metadata of the record and compare it with our target that we defined at
128
+ * the beginning of the sync. If we have reached the target position, we shutdown the debezium
129
+ * engine 2. The TargetPosition logic might not always work and in order to tackle that we have
130
+ * another logic where if we do not receive records from debezium for a given duration, we ask
131
+ * debezium engine to shutdown 3. We also take the Snapshot into consideration, when a connector is
132
+ * running for the first time, we let it complete the snapshot and only after the completion of
133
+ * snapshot we should shutdown the engine. If we are closing the engine before completion of
134
+ * snapshot, we throw an exception
135
+ */
108
136
@ Override
109
137
public void close () throws Exception {
110
138
requestClose .call ();
139
+ throwExceptionIfSnapshotNotFinished ();
111
140
}
112
141
113
- private boolean shouldSignalClose (ChangeEvent <String , String > event ) {
114
-
115
- return targetPosition .reachedTargetPosition (Jsons .deserialize (event .value ()));
142
+ private boolean shouldSignalClose (JsonNode eventAsJson ) {
143
+ return targetPosition .reachedTargetPosition (eventAsJson );
116
144
}
117
145
118
146
private void requestClose () {
@@ -121,6 +149,13 @@ private void requestClose() {
121
149
} catch (Exception e ) {
122
150
throw new RuntimeException (e );
123
151
}
152
+ throwExceptionIfSnapshotNotFinished ();
153
+ }
154
+
155
+ private void throwExceptionIfSnapshotNotFinished () {
156
+ if (!hasSnapshotFinished ) {
157
+ throw new RuntimeException ("Closing down debezium engine but snapshot has not finished" );
158
+ }
124
159
}
125
160
126
161
private static class WaitTime {
0 commit comments