Skip to content

[Source-mongo] : Adding a new iterator in preparation of adding chunking subqueries #37525

New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Merged
merged 4 commits into from
Apr 24, 2024
Merged
Show file tree
Hide file tree
Changes from 3 commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -8,7 +8,7 @@ data:
connectorSubtype: database
connectorType: source
definitionId: b2e713cd-cc36-4c0a-b5bd-b47cb8a0561e
dockerImageTag: 1.3.5
dockerImageTag: 1.3.6
dockerRepository: airbyte/source-mongodb-v2
documentationUrl: https://docs.airbyte.com/integrations/sources/mongodb-v2
githubIssueLabel: source-mongodb-v2
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -22,8 +22,6 @@
import java.util.List;
import java.util.Optional;
import org.bson.*;
import org.bson.conversions.Bson;
import org.bson.types.ObjectId;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

Expand Down Expand Up @@ -69,41 +67,11 @@ public List<AutoCloseableIterator<AirbyteMessage>> getIterators(
final Optional<MongoDbStreamState> existingState =
stateManager.getStreamState(airbyteStream.getStream().getName(), airbyteStream.getStream().getNamespace());

// The filter determines the starting point of this iterator based on the state of this collection.
// If a state exists, it will use that state to create a query akin to
// "where _id > [last saved state] order by _id ASC".
// If no state exists, it will create a query akin to "where 1=1 order by _id ASC"
final Bson filter = existingState
// Full refresh streams that finished set their id to null
// This tells us to start over
.filter(state -> state.id() != null)
.map(state -> Filters.gt(MongoConstants.ID_FIELD,
switch (state.idType()) {
case STRING -> new BsonString(state.id());
case OBJECT_ID -> new BsonObjectId(new ObjectId(state.id()));
case INT -> new BsonInt32(Integer.parseInt(state.id()));
case LONG -> new BsonInt64(Long.parseLong(state.id()));
}))
// if nothing was found, return a new BsonDocument
.orElseGet(BsonDocument::new);

// When schema is enforced we query for the selected fields
// Otherwise we retreive the entire set of fields
final var cursor = isEnforceSchema ? collection.find()
.filter(filter)
.projection(fields)
.sort(Sorts.ascending(MongoConstants.ID_FIELD))
.allowDiskUse(true)
.cursor()
: collection.find()
.filter(filter)
.sort(Sorts.ascending(MongoConstants.ID_FIELD))
.allowDiskUse(true)
.cursor();
final var recordIterator = new MongoDbInitialLoadRecordIterator(collection, fields, existingState, isEnforceSchema);
final var stateIterator =
new SourceStateIterator<>(cursor, airbyteStream, stateManager, new StateEmitFrequency(checkpointInterval,
new SourceStateIterator<>(recordIterator, airbyteStream, stateManager, new StateEmitFrequency(checkpointInterval,
MongoConstants.CHECKPOINT_DURATION));
return AutoCloseableIterators.fromIterator(stateIterator, cursor::close, null);
return AutoCloseableIterators.fromIterator(stateIterator, recordIterator::close, null);
})
.toList();
}
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,136 @@
/*
* Copyright (c) 2024 Airbyte, Inc., all rights reserved.
*/

package io.airbyte.integrations.source.mongodb;

import static io.airbyte.integrations.source.mongodb.state.InitialSnapshotStatus.IN_PROGRESS;

import com.google.common.collect.AbstractIterator;
import com.mongodb.client.MongoCollection;
import com.mongodb.client.MongoCursor;
import com.mongodb.client.model.Filters;
import com.mongodb.client.model.Sorts;
import io.airbyte.commons.exceptions.ConfigErrorException;
import io.airbyte.commons.util.AutoCloseableIterator;
import io.airbyte.integrations.source.mongodb.state.IdType;
import io.airbyte.integrations.source.mongodb.state.MongoDbStreamState;
import java.util.Optional;
import org.bson.BsonDocument;
import org.bson.BsonInt32;
import org.bson.BsonInt64;
import org.bson.BsonObjectId;
import org.bson.BsonString;
import org.bson.Document;
import org.bson.conversions.Bson;
import org.bson.types.ObjectId;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

/**
* This record iterator operates over a single stream. It continuously reads data from a table via
* multiple queries with the configured chunk size until the entire table is processed. The next
* query uses the highest watermark of the primary key seen in the previous subquery.
*/
public class MongoDbInitialLoadRecordIterator extends AbstractIterator<Document>
implements AutoCloseableIterator<Document> {

private static final Logger LOGGER = LoggerFactory.getLogger(MongoDbInitialLoadRecordIterator.class);
private final boolean isEnforceSchema;
private final MongoCollection<Document> collection;
private final Bson fields;

private Optional<MongoDbStreamState> currentState;
private MongoCursor<Document> currentIterator;

private int numSubqueries = 0;

MongoDbInitialLoadRecordIterator(final MongoCollection<Document> collection,
final Bson fields,
final Optional<MongoDbStreamState> existingState,
final boolean isEnforceSchema) {
this.collection = collection;
this.fields = fields;
this.currentState = existingState;
this.isEnforceSchema = isEnforceSchema;
this.currentIterator = buildNewQueryIterator();
}

@Override
protected Document computeNext() {
if (shouldBuildNextQuery()) {
try {
LOGGER.info("Finishing subquery number : {}", numSubqueries);
currentIterator.close();
currentIterator = buildNewQueryIterator();
numSubqueries++;
if (!currentIterator.hasNext()) {
return endOfData();
}
} catch (final Exception e) {
return endOfData();
}
}
// Get the new _id field to start the next subquery from.
Document next = currentIterator.next();
currentState = getCurrentState(next.get(MongoConstants.ID_FIELD));
return next;
}

private Optional<MongoDbStreamState> getCurrentState(Object currentId) {
final var idType = IdType.findByJavaType(currentId.getClass().getSimpleName())
.orElseThrow(() -> new ConfigErrorException("Unsupported _id type " + currentId.getClass().getSimpleName()));
final var state = new MongoDbStreamState(currentId.toString(),
IN_PROGRESS,
idType);
return Optional.of(state);
}

@Override
public void close() throws Exception {
if (currentIterator != null) {
currentIterator.close();
}
}

private MongoCursor<Document> buildNewQueryIterator() {
Bson filter = buildFilter();
return isEnforceSchema ? collection.find()
.filter(filter)
.projection(fields)
.sort(Sorts.ascending(MongoConstants.ID_FIELD))
.allowDiskUse(true)
.cursor()
: collection.find()
.filter(filter)
.sort(Sorts.ascending(MongoConstants.ID_FIELD))
.allowDiskUse(true)
.cursor();
}

private Bson buildFilter() {
// The filter determines the starting point of this iterator based on the state of this collection.
// If a state exists, it will use that state to create a query akin to
// "where _id > [last saved state] order by _id ASC".
// If no state exists, it will create a query akin to "where 1=1 order by _id ASC"
return currentState
// Full refresh streams that finished set their id to null
// This tells us to start over
.filter(state -> state.id() != null)
.map(state -> Filters.gt(MongoConstants.ID_FIELD,
switch (state.idType()) {
case STRING -> new BsonString(state.id());
case OBJECT_ID -> new BsonObjectId(new ObjectId(state.id()));
case INT -> new BsonInt32(Integer.parseInt(state.id()));
case LONG -> new BsonInt64(Long.parseLong(state.id()));
}))
// if nothing was found, return a new BsonDocument
.orElseGet(BsonDocument::new);
}

private boolean shouldBuildNextQuery() {
// The next sub-query should be built if the previous subquery has finished.
return !currentIterator.hasNext();
}

}
Loading