12
12
import com .mongodb .client .MongoDatabase ;
13
13
import io .airbyte .cdk .integrations .base .AirbyteTraceMessageUtility ;
14
14
import io .airbyte .cdk .integrations .debezium .AirbyteDebeziumHandler ;
15
+ import io .airbyte .cdk .integrations .source .relationaldb .InitialLoadTimeoutUtil ;
15
16
import io .airbyte .cdk .integrations .source .relationaldb .streamstatus .StreamStatusTraceEmitterIterator ;
16
17
import io .airbyte .commons .exceptions .ConfigErrorException ;
17
18
import io .airbyte .commons .json .Jsons ;
21
22
import io .airbyte .integrations .source .mongodb .InitialSnapshotHandler ;
22
23
import io .airbyte .integrations .source .mongodb .MongoDbSourceConfig ;
23
24
import io .airbyte .integrations .source .mongodb .MongoUtil ;
25
+ import io .airbyte .integrations .source .mongodb .state .InitialSnapshotStatus ;
24
26
import io .airbyte .integrations .source .mongodb .state .MongoDbStateManager ;
25
27
import io .airbyte .protocol .models .v0 .*;
26
28
import io .airbyte .protocol .models .v0 .AirbyteMessage ;
30
32
import java .time .Instant ;
31
33
import java .util .*;
32
34
import java .util .function .Supplier ;
35
+ import java .util .stream .Collectors ;
33
36
import java .util .stream .Stream ;
34
37
import org .bson .BsonDocument ;
35
38
import org .bson .BsonTimestamp ;
@@ -91,19 +94,25 @@ public List<AutoCloseableIterator<AirbyteMessage>> createCdcIterators(
91
94
// a workaround to allow making subsequent wait time configurable.
92
95
final Duration subsequentRecordWaitTime = firstRecordWaitTime ;
93
96
LOGGER .info ("Subsequent cdc record wait time: {} seconds" , subsequentRecordWaitTime );
97
+ final Duration initialLoadTimeout = InitialLoadTimeoutUtil .getInitialLoadTimeout (config .rawConfig ());
98
+
94
99
final int queueSize = MongoUtil .getDebeziumEventQueueSize (config );
95
100
final String databaseName = config .getDatabaseName ();
96
101
final boolean isEnforceSchema = config .getEnforceSchema ();
102
+
97
103
final Properties defaultDebeziumProperties = MongoDbCdcProperties .getDebeziumProperties ();
98
104
logOplogInfo (mongoClient );
105
+
99
106
final BsonDocument initialResumeToken =
100
107
MongoDbResumeTokenHelper .getMostRecentResumeToken (mongoClient , databaseName , incrementalOnlyStreamsCatalog );
101
108
final JsonNode initialDebeziumState =
102
109
mongoDbDebeziumStateUtil .constructInitialDebeziumState (initialResumeToken , databaseName );
110
+
103
111
final MongoDbCdcState cdcState =
104
112
(stateManager .getCdcState () == null || stateManager .getCdcState ().state () == null || stateManager .getCdcState ().state ().isNull ())
105
113
? new MongoDbCdcState (initialDebeziumState , isEnforceSchema )
106
114
: new MongoDbCdcState (Jsons .clone (stateManager .getCdcState ().state ()), stateManager .getCdcState ().schema_enforced ());
115
+
107
116
final Optional <BsonDocument > optSavedOffset = mongoDbDebeziumStateUtil .savedOffset (
108
117
Jsons .clone (defaultDebeziumProperties ),
109
118
incrementalOnlyStreamsCatalog ,
@@ -127,6 +136,7 @@ public List<AutoCloseableIterator<AirbyteMessage>> createCdcIterators(
127
136
throw new ConfigErrorException (
128
137
"Saved offset is not valid. Please reset the connection, and then increase oplog retention and/or increase sync frequency to prevent his from happening in the future. See https://docs.airbyte.com/integrations/sources/mongodb-v2#mongodb-oplog-and-change-streams for more details" );
129
138
}
139
+
130
140
LOGGER .info ("Saved offset is not valid. Airbyte will trigger a full refresh." );
131
141
// If the offset in the state is invalid, reset the state to the initial STATE
132
142
stateManager .resetState (new MongoDbCdcState (initialDebeziumState , config .getEnforceSchema ()));
@@ -145,33 +155,42 @@ public List<AutoCloseableIterator<AirbyteMessage>> createCdcIterators(
145
155
final List <ConfiguredAirbyteStream > initialSnapshotStreams =
146
156
MongoDbCdcInitialSnapshotUtils .getStreamsForInitialSnapshot (mongoClient , stateManager , incrementalOnlyStreamsCatalog , savedOffsetIsValid );
147
157
final InitialSnapshotHandler initialSnapshotHandler = new InitialSnapshotHandler ();
158
+
159
+ final Set <AirbyteStreamNameNamespacePair > streamsStillInInitialSnapshot = stateManager .getStreamStates ().entrySet ().stream ()
160
+ .filter (e -> InitialSnapshotStatus .IN_PROGRESS .equals (e .getValue ().status ()))
161
+ .map (Map .Entry ::getKey )
162
+ .collect (Collectors .toSet ());
163
+
164
+ // Fetch the streams from the catalog that still need to complete the initial snapshot sync
165
+ List <ConfiguredAirbyteStream > inProgressSnapshotStreams = new ArrayList <>(incrementalOnlyStreamsCatalog .getStreams ().stream ()
166
+ .filter (stream -> streamsStillInInitialSnapshot .contains (AirbyteStreamNameNamespacePair .fromAirbyteStream (stream .getStream ())))
167
+ .map (Jsons ::clone )
168
+ .toList ());
169
+ final var startedCdcStreamList = incrementalOnlyStreamsCatalog .getStreams ().stream ()
170
+ .filter (stream -> (!initialSnapshotStreams .contains (stream ) || inProgressSnapshotStreams .contains (stream )))
171
+ .map (stream -> stream .getStream ().getNamespace () + "." + stream .getStream ().getName ()).toList ();
172
+
148
173
final List <AutoCloseableIterator <AirbyteMessage >> initialSnapshotIterators =
149
174
initialSnapshotHandler .getIterators (initialSnapshotStreams , stateManager , mongoClient .getDatabase (databaseName ),
150
- config , true , false );
175
+ config , false , false , emittedAt , Optional . of ( initialLoadTimeout ) );
151
176
152
177
final AirbyteDebeziumHandler <BsonTimestamp > handler = new AirbyteDebeziumHandler <>(config .getDatabaseConfig (),
153
178
new MongoDbCdcTargetPosition (initialResumeToken ), false , firstRecordWaitTime , queueSize , false );
179
+
154
180
final MongoDbCdcStateHandler mongoDbCdcStateHandler = new MongoDbCdcStateHandler (stateManager );
155
181
final MongoDbCdcSavedInfoFetcher cdcSavedInfoFetcher = new MongoDbCdcSavedInfoFetcher (stateToBeUsed );
182
+
156
183
final var cdcStreamList = incrementalOnlyStreamsCatalog .getStreams ().stream ()
157
184
.filter (stream -> stream .getSyncMode () == SyncMode .INCREMENTAL )
158
185
.map (s -> s .getStream ().getNamespace () + "\\ ." + s .getStream ().getName ())
159
186
.toList ();
160
- final var propertiesManager =
161
- new MongoDbDebeziumPropertiesManager (defaultDebeziumProperties , config .getDatabaseConfig (), incrementalOnlyStreamsCatalog , cdcStreamList );
162
- final var eventConverter =
163
- new MongoDbDebeziumEventConverter (cdcMetadataInjector , incrementalOnlyStreamsCatalog , emittedAt , config .getDatabaseConfig ());
164
-
165
- final Supplier <AutoCloseableIterator <AirbyteMessage >> incrementalIteratorSupplier = () -> handler .getIncrementalIterators (
166
- propertiesManager , eventConverter , cdcSavedInfoFetcher , mongoDbCdcStateHandler );
167
187
168
188
// We can close the client after the initial snapshot is complete, incremental
169
189
// iterator does not make use of the client.
170
190
final AutoCloseableIterator <AirbyteMessage > initialSnapshotIterator = AutoCloseableIterators .appendOnClose (
171
191
AutoCloseableIterators .concatWithEagerClose (initialSnapshotIterators ), mongoClient ::close );
172
192
173
193
final List <AutoCloseableIterator <AirbyteMessage >> cdcStreamsStartStatusEmitters = incrementalOnlyStreamsCatalog .getStreams ().stream ()
174
- .filter (stream -> !initialSnapshotStreams .contains (stream ))
175
194
.map (stream -> (AutoCloseableIterator <AirbyteMessage >) new StreamStatusTraceEmitterIterator (new AirbyteStreamStatusHolder (
176
195
new io .airbyte .protocol .models .AirbyteStreamNameNamespacePair (stream .getStream ().getName (), stream .getStream ().getNamespace ()),
177
196
AirbyteStreamStatusTraceMessage .AirbyteStreamStatus .STARTED )))
@@ -183,9 +202,69 @@ public List<AutoCloseableIterator<AirbyteMessage>> createCdcIterators(
183
202
AirbyteStreamStatusTraceMessage .AirbyteStreamStatus .COMPLETE )))
184
203
.toList ();
185
204
186
- return Stream .of (Collections .singletonList (initialSnapshotIterator ), cdcStreamsStartStatusEmitters ,
187
- Collections .singletonList (AutoCloseableIterators .lazyIterator (incrementalIteratorSupplier , null )),
188
- cdcStreamsCompleteStatusEmitters ).flatMap (Collection ::stream ).toList ();
205
+ if (startedCdcStreamList .isEmpty ()) {
206
+ LOGGER .info ("First sync - no cdc streams have been completed or started" );
207
+ /*
208
+ * This is the first run case - no initial loads have been started. In this case, we want to run the
209
+ * iterators in the following order: 1. Run the initial load iterators. This step will timeout and
210
+ * throw a transient error if run for too long (> 8hrs by default). 2. Run the debezium iterators
211
+ * with ALL of the incremental streams configured. This is because if step 1 completes, the initial
212
+ * load can be considered finished.
213
+ */
214
+ final var propertiesManager =
215
+ new MongoDbDebeziumPropertiesManager (defaultDebeziumProperties , config .getDatabaseConfig (), incrementalOnlyStreamsCatalog , cdcStreamList );
216
+ final var eventConverter =
217
+ new MongoDbDebeziumEventConverter (cdcMetadataInjector , incrementalOnlyStreamsCatalog , emittedAt , config .getDatabaseConfig ());
218
+ final Supplier <AutoCloseableIterator <AirbyteMessage >> incrementalIteratorSupplier = () -> handler .getIncrementalIterators (
219
+ propertiesManager , eventConverter , cdcSavedInfoFetcher , mongoDbCdcStateHandler );
220
+
221
+ return Stream .of (
222
+ cdcStreamsStartStatusEmitters ,
223
+ Collections .singletonList (initialSnapshotIterator ),
224
+ Collections .singletonList (AutoCloseableIterators .lazyIterator (incrementalIteratorSupplier , null )),
225
+ cdcStreamsCompleteStatusEmitters ).flatMap (Collection ::stream ).toList ();
226
+ } else if (initialSnapshotIterators .isEmpty ()) {
227
+ LOGGER .info ("Initial load has finished completely - only reading the oplog" );
228
+ /*
229
+ * In this case, the initial load has completed and only debezium should be run. The iterators
230
+ * should be run in the following order: 1. Run the debezium iterators with ALL of the incremental
231
+ * streams configured.
232
+ */
233
+ final var propertiesManager =
234
+ new MongoDbDebeziumPropertiesManager (defaultDebeziumProperties , config .getDatabaseConfig (), incrementalOnlyStreamsCatalog , cdcStreamList );
235
+ final var eventConverter =
236
+ new MongoDbDebeziumEventConverter (cdcMetadataInjector , incrementalOnlyStreamsCatalog , emittedAt , config .getDatabaseConfig ());
237
+ final Supplier <AutoCloseableIterator <AirbyteMessage >> incrementalIteratorSupplier = () -> handler .getIncrementalIterators (
238
+ propertiesManager , eventConverter , cdcSavedInfoFetcher , mongoDbCdcStateHandler );
239
+ return Stream .of (
240
+ cdcStreamsStartStatusEmitters ,
241
+ Collections .singletonList (AutoCloseableIterators .lazyIterator (incrementalIteratorSupplier , null )),
242
+ cdcStreamsCompleteStatusEmitters ).flatMap (Collection ::stream ).toList ();
243
+ } else {
244
+ LOGGER .info ("Initial load is in progress - reading oplog first and then resuming with initial load." );
245
+ /*
246
+ * In this case, the initial load has partially completed (WASS case). The iterators should be run
247
+ * in the following order: 1. Run the debezium iterators with only the incremental streams which
248
+ * have been fully or partially completed configured. 2. Resume initial load for partially completed
249
+ * and not started streams. This step will timeout and throw a transient error if run for too long
250
+ * (> 8hrs by default). 3. Emit a transient error. This is to signal to the platform to restart the
251
+ * sync to clear the oplog. We cannot simply add the same cdc iterators as their target end position
252
+ * is fixed to the tip of the oplog at the start of the sync.
253
+ */
254
+ final var propertiesManager =
255
+ new MongoDbDebeziumPropertiesManager (defaultDebeziumProperties , config .getDatabaseConfig (), incrementalOnlyStreamsCatalog ,
256
+ startedCdcStreamList );
257
+ final var eventConverter =
258
+ new MongoDbDebeziumEventConverter (cdcMetadataInjector , incrementalOnlyStreamsCatalog , emittedAt , config .getDatabaseConfig ());
259
+ final Supplier <AutoCloseableIterator <AirbyteMessage >> incrementalIteratorSupplier = () -> handler .getIncrementalIterators (
260
+ propertiesManager , eventConverter , cdcSavedInfoFetcher , mongoDbCdcStateHandler );
261
+ return Stream .of (
262
+ cdcStreamsStartStatusEmitters ,
263
+ Collections .singletonList (AutoCloseableIterators .lazyIterator (incrementalIteratorSupplier , null )),
264
+ Collections .singletonList (initialSnapshotIterator ),
265
+ cdcStreamsCompleteStatusEmitters )
266
+ .flatMap (Collection ::stream ).toList ();
267
+ }
189
268
}
190
269
191
270
private void logOplogInfo (final MongoClient mongoClient ) {
0 commit comments