Skip to content

Commit 993aece

Browse files
authored
[Source-mongo] : Implement fixed chunk size (1million) (#37559)
1 parent 135e623 commit 993aece

File tree

5 files changed

+17
-6
lines changed

5 files changed

+17
-6
lines changed

airbyte-integrations/connectors/source-mongodb-v2/metadata.yaml

+1-1
Original file line numberDiff line numberDiff line change
@@ -8,7 +8,7 @@ data:
88
connectorSubtype: database
99
connectorType: source
1010
definitionId: b2e713cd-cc36-4c0a-b5bd-b47cb8a0561e
11-
dockerImageTag: 1.3.7
11+
dockerImageTag: 1.3.8
1212
dockerRepository: airbyte/source-mongodb-v2
1313
documentationUrl: https://docs.airbyte.com/integrations/sources/mongodb-v2
1414
githubIssueLabel: source-mongodb-v2

airbyte-integrations/connectors/source-mongodb-v2/src/main/java/io/airbyte/integrations/source/mongodb/InitialSnapshotHandler.java

+2-1
Original file line numberDiff line numberDiff line change
@@ -31,6 +31,7 @@
3131
public class InitialSnapshotHandler {
3232

3333
private static final Logger LOGGER = LoggerFactory.getLogger(InitialSnapshotHandler.class);
34+
private static final int DEFAULT_CHUNK_SIZE = 1_000_000;
3435

3536
/**
3637
* For each given stream configured as incremental sync it will output an iterator that will
@@ -67,7 +68,7 @@ public List<AutoCloseableIterator<AirbyteMessage>> getIterators(
6768
final Optional<MongoDbStreamState> existingState =
6869
stateManager.getStreamState(airbyteStream.getStream().getName(), airbyteStream.getStream().getNamespace());
6970

70-
final var recordIterator = new MongoDbInitialLoadRecordIterator(collection, fields, existingState, isEnforceSchema);
71+
final var recordIterator = new MongoDbInitialLoadRecordIterator(collection, fields, existingState, isEnforceSchema, DEFAULT_CHUNK_SIZE);
7172
final var stateIterator =
7273
new SourceStateIterator<>(recordIterator, airbyteStream, stateManager, new StateEmitFrequency(checkpointInterval,
7374
MongoConstants.CHECKPOINT_DURATION));

airbyte-integrations/connectors/source-mongodb-v2/src/main/java/io/airbyte/integrations/source/mongodb/MongoDbInitialLoadRecordIterator.java

+9-2
Original file line numberDiff line numberDiff line change
@@ -39,6 +39,8 @@ public class MongoDbInitialLoadRecordIterator extends AbstractIterator<Document>
3939
private final boolean isEnforceSchema;
4040
private final MongoCollection<Document> collection;
4141
private final Bson fields;
42+
// Represents the number of rows to get with each query.
43+
private final int chunkSize;
4244

4345
private Optional<MongoDbStreamState> currentState;
4446
private MongoCursor<Document> currentIterator;
@@ -48,19 +50,22 @@ public class MongoDbInitialLoadRecordIterator extends AbstractIterator<Document>
4850
MongoDbInitialLoadRecordIterator(final MongoCollection<Document> collection,
4951
final Bson fields,
5052
final Optional<MongoDbStreamState> existingState,
51-
final boolean isEnforceSchema) {
53+
final boolean isEnforceSchema,
54+
final int chunkSize) {
5255
this.collection = collection;
5356
this.fields = fields;
5457
this.currentState = existingState;
5558
this.isEnforceSchema = isEnforceSchema;
59+
this.chunkSize = chunkSize;
5660
this.currentIterator = buildNewQueryIterator();
5761
}
5862

5963
@Override
6064
protected Document computeNext() {
6165
if (shouldBuildNextQuery()) {
6266
try {
63-
LOGGER.info("Finishing subquery number : {}", numSubqueries);
67+
LOGGER.info("Finishing subquery number : {}, processing at id : {}", numSubqueries,
68+
currentState.get() == null ? "starting null" : currentState.get().id());
6469
currentIterator.close();
6570
currentIterator = buildNewQueryIterator();
6671
numSubqueries++;
@@ -98,11 +103,13 @@ private MongoCursor<Document> buildNewQueryIterator() {
98103
return isEnforceSchema ? collection.find()
99104
.filter(filter)
100105
.projection(fields)
106+
.limit(chunkSize)
101107
.sort(Sorts.ascending(MongoConstants.ID_FIELD))
102108
.allowDiskUse(true)
103109
.cursor()
104110
: collection.find()
105111
.filter(filter)
112+
.limit(chunkSize)
106113
.sort(Sorts.ascending(MongoConstants.ID_FIELD))
107114
.allowDiskUse(true)
108115
.cursor();

airbyte-integrations/connectors/source-mongodb-v2/src/test/java/io/airbyte/integrations/source/mongodb/cdc/MongoDbCdcInitializerTest.java

+2
Original file line numberDiff line numberDiff line change
@@ -14,6 +14,7 @@
1414
import static org.junit.jupiter.api.Assertions.assertTrue;
1515
import static org.mockito.ArgumentMatchers.any;
1616
import static org.mockito.ArgumentMatchers.anyBoolean;
17+
import static org.mockito.ArgumentMatchers.anyInt;
1718
import static org.mockito.ArgumentMatchers.anyList;
1819
import static org.mockito.Mockito.doCallRealMethod;
1920
import static org.mockito.Mockito.doReturn;
@@ -153,6 +154,7 @@ void setUp() {
153154
when(mongoCollection.find()).thenReturn(findIterable);
154155
when(findIterable.filter(any())).thenReturn(findIterable);
155156
when(findIterable.projection(any())).thenReturn(findIterable);
157+
when(findIterable.limit(anyInt())).thenReturn(findIterable);
156158
when(findIterable.sort(any())).thenReturn(findIterable);
157159
when(findIterable.cursor()).thenReturn(findCursor);
158160
when(findCursor.hasNext()).thenReturn(true);

docs/integrations/sources/mongodb-v2.md

+3-2
Original file line numberDiff line numberDiff line change
@@ -221,9 +221,10 @@ For more information regarding configuration parameters, please see [MongoDb Doc
221221

222222
| Version | Date | Pull Request | Subject |
223223
|:--------|:-----------|:---------------------------------------------------------|:----------------------------------------------------------------------------------------------------------|
224+
| 1.3.8 | 2024-04-24 | [37559](https://github.com/airbytehq/airbyte/pull/37559) | Implement fixed-size chunking while performing initial load. |
224225
| 1.3.7 | 2024-04-24 | [37557](https://github.com/airbytehq/airbyte/pull/37557) | Change bug in resume token validity check. |
225-
| 1.3.6 | 2024-04-24 | [37525](https://github.com/airbytehq/airbyte/pull/37525) | Internal refactor. |
226-
| 1.3.5 | 2024-04-22 | [37348](https://github.com/airbytehq/airbyte/pull/37348) | Do not send estimate trace if we do not have data. |
226+
| 1.3.6 | 2024-04-24 | [37525](https://github.com/airbytehq/airbyte/pull/37525) | Internal refactor. |
227+
| 1.3.5 | 2024-04-22 | [37348](https://github.com/airbytehq/airbyte/pull/37348) | Do not send estimate trace if we do not have data. |
227228
| 1.3.4 | 2024-04-16 | [37348](https://github.com/airbytehq/airbyte/pull/37348) | Populate null values in airbyte record messages. |
228229
| 1.3.3 | 2024-04-05 | [36872](https://github.com/airbytehq/airbyte/pull/36872) | Update to connector's metadat definition. |
229230
| 1.3.2 | 2024-04-04 | [36845](https://github.com/airbytehq/airbyte/pull/36845) | Adopt Kotlin CDK. |

0 commit comments

Comments
 (0)