Skip to content

[Source-mongo] : Implement fixed chunk size (1million) #37559

New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Merged
merged 4 commits into from
Apr 25, 2024
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -8,7 +8,7 @@ data:
connectorSubtype: database
connectorType: source
definitionId: b2e713cd-cc36-4c0a-b5bd-b47cb8a0561e
dockerImageTag: 1.3.7
dockerImageTag: 1.3.8
dockerRepository: airbyte/source-mongodb-v2
documentationUrl: https://docs.airbyte.com/integrations/sources/mongodb-v2
githubIssueLabel: source-mongodb-v2
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -31,6 +31,7 @@
public class InitialSnapshotHandler {

private static final Logger LOGGER = LoggerFactory.getLogger(InitialSnapshotHandler.class);
private static final int DEFAULT_CHUNK_SIZE = 1_000_000;

/**
* For each given stream configured as incremental sync it will output an iterator that will
Expand Down Expand Up @@ -67,7 +68,7 @@ public List<AutoCloseableIterator<AirbyteMessage>> getIterators(
final Optional<MongoDbStreamState> existingState =
stateManager.getStreamState(airbyteStream.getStream().getName(), airbyteStream.getStream().getNamespace());

final var recordIterator = new MongoDbInitialLoadRecordIterator(collection, fields, existingState, isEnforceSchema);
final var recordIterator = new MongoDbInitialLoadRecordIterator(collection, fields, existingState, isEnforceSchema, DEFAULT_CHUNK_SIZE);
final var stateIterator =
new SourceStateIterator<>(recordIterator, airbyteStream, stateManager, new StateEmitFrequency(checkpointInterval,
MongoConstants.CHECKPOINT_DURATION));
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -39,6 +39,8 @@ public class MongoDbInitialLoadRecordIterator extends AbstractIterator<Document>
private final boolean isEnforceSchema;
private final MongoCollection<Document> collection;
private final Bson fields;
// Represents the number of rows to get with each query.
private final int chunkSize;

private Optional<MongoDbStreamState> currentState;
private MongoCursor<Document> currentIterator;
Expand All @@ -48,19 +50,22 @@ public class MongoDbInitialLoadRecordIterator extends AbstractIterator<Document>
MongoDbInitialLoadRecordIterator(final MongoCollection<Document> collection,
final Bson fields,
final Optional<MongoDbStreamState> existingState,
final boolean isEnforceSchema) {
final boolean isEnforceSchema,
final int chunkSize) {
this.collection = collection;
this.fields = fields;
this.currentState = existingState;
this.isEnforceSchema = isEnforceSchema;
this.chunkSize = chunkSize;
this.currentIterator = buildNewQueryIterator();
}

@Override
protected Document computeNext() {
if (shouldBuildNextQuery()) {
try {
LOGGER.info("Finishing subquery number : {}", numSubqueries);
LOGGER.info("Finishing subquery number : {}, processing at id : {}", numSubqueries,
currentState.get() == null ? "starting null" : currentState.get().id());
currentIterator.close();
currentIterator = buildNewQueryIterator();
numSubqueries++;
Expand Down Expand Up @@ -98,11 +103,13 @@ private MongoCursor<Document> buildNewQueryIterator() {
return isEnforceSchema ? collection.find()
.filter(filter)
.projection(fields)
.limit(chunkSize)
.sort(Sorts.ascending(MongoConstants.ID_FIELD))
.allowDiskUse(true)
.cursor()
: collection.find()
.filter(filter)
.limit(chunkSize)
.sort(Sorts.ascending(MongoConstants.ID_FIELD))
.allowDiskUse(true)
.cursor();
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -14,6 +14,7 @@
import static org.junit.jupiter.api.Assertions.assertTrue;
import static org.mockito.ArgumentMatchers.any;
import static org.mockito.ArgumentMatchers.anyBoolean;
import static org.mockito.ArgumentMatchers.anyInt;
import static org.mockito.ArgumentMatchers.anyList;
import static org.mockito.Mockito.doCallRealMethod;
import static org.mockito.Mockito.doReturn;
Expand Down Expand Up @@ -153,6 +154,7 @@ void setUp() {
when(mongoCollection.find()).thenReturn(findIterable);
when(findIterable.filter(any())).thenReturn(findIterable);
when(findIterable.projection(any())).thenReturn(findIterable);
when(findIterable.limit(anyInt())).thenReturn(findIterable);
when(findIterable.sort(any())).thenReturn(findIterable);
when(findIterable.cursor()).thenReturn(findCursor);
when(findCursor.hasNext()).thenReturn(true);
Expand Down
5 changes: 3 additions & 2 deletions docs/integrations/sources/mongodb-v2.md
Original file line number Diff line number Diff line change
Expand Up @@ -221,9 +221,10 @@ For more information regarding configuration parameters, please see [MongoDb Doc

| Version | Date | Pull Request | Subject |
|:--------|:-----------|:---------------------------------------------------------|:----------------------------------------------------------------------------------------------------------|
| 1.3.8 | 2024-04-24 | [37559](https://github.com/airbytehq/airbyte/pull/37559) | Implement fixed-size chunking while performing initial load. |
| 1.3.7 | 2024-04-24 | [37557](https://github.com/airbytehq/airbyte/pull/37557) | Change bug in resume token validity check. |
| 1.3.6 | 2024-04-24 | [37525](https://github.com/airbytehq/airbyte/pull/37525) | Internal refactor. |
| 1.3.5 | 2024-04-22 | [37348](https://github.com/airbytehq/airbyte/pull/37348) | Do not send estimate trace if we do not have data. |
| 1.3.6 | 2024-04-24 | [37525](https://github.com/airbytehq/airbyte/pull/37525) | Internal refactor. |
| 1.3.5 | 2024-04-22 | [37348](https://github.com/airbytehq/airbyte/pull/37348) | Do not send estimate trace if we do not have data. |
| 1.3.4 | 2024-04-16 | [37348](https://github.com/airbytehq/airbyte/pull/37348) | Populate null values in airbyte record messages. |
| 1.3.3 | 2024-04-05 | [36872](https://github.com/airbytehq/airbyte/pull/36872) | Update to connector's metadat definition. |
| 1.3.2 | 2024-04-04 | [36845](https://github.com/airbytehq/airbyte/pull/36845) | Adopt Kotlin CDK. |
Expand Down