Skip to content

Commit 48930e4

Browse files
authored
[Source-mongo] : Chunk size(limit) should correspond to ~1GB of data (#37753)
1 parent bdb3556 commit 48930e4

File tree

7 files changed

+83
-13
lines changed

7 files changed

+83
-13
lines changed

airbyte-integrations/connectors/source-mongodb-v2/metadata.yaml

+1-1
Original file line numberDiff line numberDiff line change
@@ -8,7 +8,7 @@ data:
88
connectorSubtype: database
99
connectorType: source
1010
definitionId: b2e713cd-cc36-4c0a-b5bd-b47cb8a0561e
11-
dockerImageTag: 1.3.10
11+
dockerImageTag: 1.3.11
1212
dockerRepository: airbyte/source-mongodb-v2
1313
documentationUrl: https://docs.airbyte.com/integrations/sources/mongodb-v2
1414
githubIssueLabel: source-mongodb-v2

airbyte-integrations/connectors/source-mongodb-v2/src/main/java/io/airbyte/integrations/source/mongodb/InitialSnapshotHandler.java

+4-2
Original file line numberDiff line numberDiff line change
@@ -12,6 +12,7 @@
1212
import io.airbyte.commons.exceptions.ConfigErrorException;
1313
import io.airbyte.commons.util.AutoCloseableIterator;
1414
import io.airbyte.commons.util.AutoCloseableIterators;
15+
import io.airbyte.integrations.source.mongodb.MongoUtil.CollectionStatistics;
1516
import io.airbyte.integrations.source.mongodb.state.IdType;
1617
import io.airbyte.integrations.source.mongodb.state.MongoDbStateManager;
1718
import io.airbyte.integrations.source.mongodb.state.MongoDbStreamState;
@@ -31,7 +32,6 @@
3132
public class InitialSnapshotHandler {
3233

3334
private static final Logger LOGGER = LoggerFactory.getLogger(InitialSnapshotHandler.class);
34-
private static final int DEFAULT_CHUNK_SIZE = 1_000_000;
3535

3636
/**
3737
* For each given stream configured as incremental sync it will output an iterator that will
@@ -68,7 +68,9 @@ public List<AutoCloseableIterator<AirbyteMessage>> getIterators(
6868
final Optional<MongoDbStreamState> existingState =
6969
stateManager.getStreamState(airbyteStream.getStream().getName(), airbyteStream.getStream().getNamespace());
7070

71-
final var recordIterator = new MongoDbInitialLoadRecordIterator(collection, fields, existingState, isEnforceSchema, DEFAULT_CHUNK_SIZE);
71+
final Optional<CollectionStatistics> collectionStatistics = MongoUtil.getCollectionStatistics(database, airbyteStream);
72+
final var recordIterator = new MongoDbInitialLoadRecordIterator(collection, fields, existingState, isEnforceSchema,
73+
MongoUtil.getChunkSizeForCollection(collectionStatistics, airbyteStream));
7274
final var stateIterator =
7375
new SourceStateIterator<>(recordIterator, airbyteStream, stateManager, new StateEmitFrequency(checkpointInterval,
7476
MongoConstants.CHECKPOINT_DURATION));

airbyte-integrations/connectors/source-mongodb-v2/src/main/java/io/airbyte/integrations/source/mongodb/MongoUtil.java

+40-2
Original file line numberDiff line numberDiff line change
@@ -54,6 +54,11 @@ public class MongoUtil {
5454
*/
5555
private static final Set<String> IGNORED_COLLECTIONS = Set.of("system.", "replset.", "oplog.");
5656

57+
@VisibleForTesting
58+
static final int DEFAULT_CHUNK_SIZE = 1_000_000;
59+
@VisibleForTesting
60+
static final long QUERY_TARGET_SIZE_GB = 1_073_741_824;
61+
5762
/**
5863
* The minimum size of the Debezium event queue. This value will be selected if the provided
5964
* configuration value for the queue size is less than this value
@@ -174,10 +179,9 @@ public static int getDebeziumEventQueueSize(final MongoDbSourceConfig config) {
174179
* @return The {@link CollectionStatistics} of the collection or an empty {@link Optional} if the
175180
* statistics cannot be retrieved.
176181
*/
177-
public static Optional<CollectionStatistics> getCollectionStatistics(final MongoClient mongoClient, final ConfiguredAirbyteStream stream) {
182+
public static Optional<CollectionStatistics> getCollectionStatistics(final MongoDatabase mongoDatabase, final ConfiguredAirbyteStream stream) {
178183
try {
179184
final Map<String, Object> collStats = Map.of(MongoConstants.STORAGE_STATS_KEY, Map.of(), MongoConstants.COUNT_KEY, Map.of());
180-
final MongoDatabase mongoDatabase = mongoClient.getDatabase(stream.getStream().getNamespace());
181185
final MongoCollection<Document> collection = mongoDatabase.getCollection(stream.getStream().getName());
182186
final AggregateIterable<Document> output = collection.aggregate(List.of(new Document("$collStats", collStats)));
183187

@@ -206,6 +210,40 @@ public static Optional<CollectionStatistics> getCollectionStatistics(final Mongo
206210
return Optional.empty();
207211
}
208212

213+
public static int getChunkSizeForCollection(final Optional<CollectionStatistics> collectionStatistics, final ConfiguredAirbyteStream stream) {
214+
// If table size info could not be calculated, a default chunk size will be provided.
215+
if (collectionStatistics.isEmpty() || shouldUseDefaultChunkSize(collectionStatistics.get())) {
216+
LOGGER.info("Chunk size could not be determined for: {}.{}, defaulting to {} rows", stream.getStream().getNamespace(),
217+
stream.getStream().getName(), DEFAULT_CHUNK_SIZE);
218+
return DEFAULT_CHUNK_SIZE;
219+
}
220+
CollectionStatistics stats = collectionStatistics.get();
221+
final long totalRows = stats.count().longValue();
222+
final long totalBytes = stats.size().longValue();
223+
final long bytesPerRow = totalBytes / totalRows;
224+
if (bytesPerRow == 0) {
225+
LOGGER.info("Chunk size could not be determined for: {}.{}, defaulting to {} rows", stream.getStream().getNamespace(),
226+
stream.getStream().getName(), DEFAULT_CHUNK_SIZE);
227+
return DEFAULT_CHUNK_SIZE;
228+
}
229+
// Otherwise the chunk size is essentially the limit - the number of rows to fetch per query. This
230+
// number is the number of rows that would
231+
// correspond to roughly ~1GB of data.
232+
final int chunkSize = (int) (QUERY_TARGET_SIZE_GB / bytesPerRow);
233+
if (chunkSize <= 0) {
234+
LOGGER.info("Chunk size could not be determined for: {}.{}, defaulting to {} rows", stream.getStream().getNamespace(),
235+
stream.getStream().getName(), DEFAULT_CHUNK_SIZE);
236+
return DEFAULT_CHUNK_SIZE;
237+
}
238+
LOGGER.info("Chunk size determined for: {}.{}, to be {} rows", stream.getStream().getNamespace(),
239+
stream.getStream().getName(), chunkSize);
240+
return chunkSize;
241+
}
242+
243+
private static boolean shouldUseDefaultChunkSize(CollectionStatistics stats) {
244+
return stats.size().longValue() == 0 || stats.count().longValue() == 0;
245+
}
246+
209247
/**
210248
* Checks whether the user's config + catalog does not match. This can happen in the following cases
211249
* : 1. User is in schemaless mode + catalog corresponds to schema enabled mode. 2. User is in

airbyte-integrations/connectors/source-mongodb-v2/src/main/java/io/airbyte/integrations/source/mongodb/cdc/MongoDbCdcInitialSnapshotUtils.java

+2-1
Original file line numberDiff line numberDiff line change
@@ -124,7 +124,8 @@ private static List<ConfiguredAirbyteStream> identifyStreamsToSnapshot(final Con
124124
}
125125

126126
private static void estimateInitialSnapshotSyncSize(final MongoClient mongoClient, final ConfiguredAirbyteStream stream) {
127-
final Optional<MongoUtil.CollectionStatistics> collectionStatistics = MongoUtil.getCollectionStatistics(mongoClient, stream);
127+
final Optional<MongoUtil.CollectionStatistics> collectionStatistics =
128+
MongoUtil.getCollectionStatistics(mongoClient.getDatabase(stream.getStream().getNamespace()), stream);
128129
collectionStatistics.ifPresent(c -> {
129130
AirbyteTraceMessageUtility.emitEstimateTrace(PLATFORM_DATA_INCREASE_FACTOR * c.size().longValue(),
130131
AirbyteEstimateTraceMessage.Type.STREAM, c.count().longValue(), stream.getStream().getName(), stream.getStream().getNamespace());

airbyte-integrations/connectors/source-mongodb-v2/src/test/java/io/airbyte/integrations/source/mongodb/MongoUtilTest.java

+32-6
Original file line numberDiff line numberDiff line change
@@ -9,8 +9,10 @@
99
import static io.airbyte.integrations.source.mongodb.MongoCatalogHelper.AIRBYTE_STREAM_PROPERTIES;
1010
import static io.airbyte.integrations.source.mongodb.MongoConstants.DATABASE_CONFIG_CONFIGURATION_KEY;
1111
import static io.airbyte.integrations.source.mongodb.MongoConstants.DEFAULT_DISCOVER_SAMPLE_SIZE;
12+
import static io.airbyte.integrations.source.mongodb.MongoUtil.DEFAULT_CHUNK_SIZE;
1213
import static io.airbyte.integrations.source.mongodb.MongoUtil.MAX_QUEUE_SIZE;
1314
import static io.airbyte.integrations.source.mongodb.MongoUtil.MIN_QUEUE_SIZE;
15+
import static io.airbyte.integrations.source.mongodb.MongoUtil.QUERY_TARGET_SIZE_GB;
1416
import static io.airbyte.integrations.source.mongodb.MongoUtil.checkSchemaModeMismatch;
1517
import static org.assertj.core.api.AssertionsForClassTypes.assertThat;
1618
import static org.assertj.core.api.AssertionsForClassTypes.catchThrowable;
@@ -39,6 +41,7 @@
3941
import io.airbyte.commons.exceptions.ConfigErrorException;
4042
import io.airbyte.commons.json.Jsons;
4143
import io.airbyte.commons.resources.MoreResources;
44+
import io.airbyte.integrations.source.mongodb.MongoUtil.CollectionStatistics;
4245
import io.airbyte.integrations.source.mongodb.cdc.MongoDbCdcConnectorMetadataInjector;
4346
import io.airbyte.protocol.models.JsonSchemaType;
4447
import io.airbyte.protocol.models.v0.AirbyteStream;
@@ -349,7 +352,7 @@ void testGetCollectionStatistics() throws IOException {
349352
when(mongoClient.getDatabase(databaseName)).thenReturn(mongoDatabase);
350353
when(aggregateIterable.allowDiskUse(anyBoolean())).thenReturn(aggregateIterable);
351354

352-
final Optional<MongoUtil.CollectionStatistics> statistics = MongoUtil.getCollectionStatistics(mongoClient, configuredAirbyteStream);
355+
final Optional<MongoUtil.CollectionStatistics> statistics = MongoUtil.getCollectionStatistics(mongoDatabase, configuredAirbyteStream);
353356

354357
assertTrue(statistics.isPresent());
355358
assertEquals(746, statistics.get().count());
@@ -375,7 +378,7 @@ void testGetCollectionStatisticsNoResult() {
375378
when(mongoDatabase.getCollection(collectionName)).thenReturn(mongoCollection);
376379
when(mongoClient.getDatabase(databaseName)).thenReturn(mongoDatabase);
377380

378-
final Optional<MongoUtil.CollectionStatistics> statistics = MongoUtil.getCollectionStatistics(mongoClient, configuredAirbyteStream);
381+
final Optional<MongoUtil.CollectionStatistics> statistics = MongoUtil.getCollectionStatistics(mongoDatabase, configuredAirbyteStream);
379382

380383
assertFalse(statistics.isPresent());
381384
}
@@ -401,7 +404,7 @@ void testGetCollectionStatisticsEmptyResult() {
401404
when(mongoDatabase.getCollection(collectionName)).thenReturn(mongoCollection);
402405
when(mongoClient.getDatabase(databaseName)).thenReturn(mongoDatabase);
403406

404-
final Optional<MongoUtil.CollectionStatistics> statistics = MongoUtil.getCollectionStatistics(mongoClient, configuredAirbyteStream);
407+
final Optional<MongoUtil.CollectionStatistics> statistics = MongoUtil.getCollectionStatistics(mongoDatabase, configuredAirbyteStream);
405408

406409
assertFalse(statistics.isPresent());
407410
}
@@ -410,19 +413,42 @@ void testGetCollectionStatisticsEmptyResult() {
410413
void testGetCollectionStatisticsException() {
411414
final String collectionName = "test-collection";
412415
final String databaseName = "test-database";
413-
final MongoClient mongoClient = mock(MongoClient.class);
416+
final MongoDatabase mongoDatabase = mock(MongoDatabase.class);
414417

415418
final AirbyteStream stream = new AirbyteStream().withName(collectionName).withNamespace(databaseName);
416419
final ConfiguredAirbyteStream configuredAirbyteStream = new ConfiguredAirbyteStream().withStream(stream);
417420

418-
when(mongoClient.getDatabase(databaseName)).thenThrow(new IllegalArgumentException("test"));
421+
when(mongoDatabase.getCollection(collectionName)).thenThrow(new IllegalArgumentException("test"));
419422

420-
final Optional<MongoUtil.CollectionStatistics> statistics = MongoUtil.getCollectionStatistics(mongoClient, configuredAirbyteStream);
423+
final Optional<MongoUtil.CollectionStatistics> statistics = MongoUtil.getCollectionStatistics(mongoDatabase, configuredAirbyteStream);
421424

422425
assertFalse(statistics.isPresent());
423426

424427
}
425428

429+
@Test
430+
void testChunkSize() {
431+
final String collectionName = "test-collection";
432+
final String databaseName = "test-database";
433+
final AirbyteStream stream = new AirbyteStream().withName(collectionName).withNamespace(databaseName);
434+
final ConfiguredAirbyteStream configuredAirbyteStream = new ConfiguredAirbyteStream().withStream(stream);
435+
436+
// Assert that the default chunk size is returned
437+
assertThat(MongoUtil.getChunkSizeForCollection(Optional.empty(), configuredAirbyteStream)).isEqualTo(1_000_000);
438+
assertThat(MongoUtil.getChunkSizeForCollection(Optional.of(new CollectionStatistics(0, 0)), configuredAirbyteStream))
439+
.isEqualTo(DEFAULT_CHUNK_SIZE);
440+
assertThat(MongoUtil.getChunkSizeForCollection(Optional.of(new CollectionStatistics(0, 1000)), configuredAirbyteStream))
441+
.isEqualTo(DEFAULT_CHUNK_SIZE);
442+
assertThat(MongoUtil.getChunkSizeForCollection(Optional.of(new CollectionStatistics(1000, 0)), configuredAirbyteStream))
443+
.isEqualTo(DEFAULT_CHUNK_SIZE);
444+
assertThat(MongoUtil.getChunkSizeForCollection(Optional.of(new CollectionStatistics(1000, 999)), configuredAirbyteStream))
445+
.isEqualTo(DEFAULT_CHUNK_SIZE);
446+
447+
assertThat(
448+
MongoUtil.getChunkSizeForCollection(Optional.of(new CollectionStatistics(1_000_000, 10 * QUERY_TARGET_SIZE_GB)), configuredAirbyteStream))
449+
.isEqualTo(100_003);
450+
}
451+
426452
private static String formatMismatchException(final boolean isConfigSchemaEnforced,
427453
final boolean isCatalogSchemaEnforcing,
428454
final boolean isStateSchemaEnforced) {

airbyte-integrations/connectors/source-mongodb-v2/src/test/java/io/airbyte/integrations/source/mongodb/cdc/MongoDbCdcInitialSnapshotUtilsTest.java

+3-1
Original file line numberDiff line numberDiff line change
@@ -106,6 +106,7 @@ void testRetrieveInitialSnapshotIteratorsInvalidSavedOffset() {
106106
void testFailureToGenerateEstimateDoesNotImpactSync() {
107107
final MongoDbStateManager stateManager = mock(MongoDbStateManager.class);
108108
final MongoClient mongoClient = mock(MongoClient.class);
109+
final MongoDatabase mongoDatabase = mock(MongoDatabase.class);
109110
final ConfiguredAirbyteStream completedStream = createConfiguredAirbyteStream(COMPLETED_NAME, NAMESPACE);
110111
final ConfiguredAirbyteStream inProgressStream = createConfiguredAirbyteStream(IN_PROGRESS_NAME, NAMESPACE);
111112
final ConfiguredAirbyteStream newStream = createConfiguredAirbyteStream(NEW_NAME, NAMESPACE);
@@ -117,7 +118,8 @@ void testFailureToGenerateEstimateDoesNotImpactSync() {
117118
new AirbyteStreamNameNamespacePair(COMPLETED_NAME, NAMESPACE), new MongoDbStreamState("1", InitialSnapshotStatus.COMPLETE, IdType.OBJECT_ID),
118119
new AirbyteStreamNameNamespacePair(IN_PROGRESS_NAME, NAMESPACE),
119120
new MongoDbStreamState("2", InitialSnapshotStatus.IN_PROGRESS, IdType.OBJECT_ID)));
120-
when(mongoClient.getDatabase(NAMESPACE)).thenThrow(new IllegalArgumentException("test"));
121+
when(mongoClient.getDatabase(NAMESPACE)).thenReturn(mongoDatabase);
122+
when(mongoDatabase.getCollection(NEW_NAME)).thenThrow(new IllegalArgumentException("test"));
121123

122124
final List<ConfiguredAirbyteStream> initialSnapshotStreams =
123125
MongoDbCdcInitialSnapshotUtils.getStreamsForInitialSnapshot(mongoClient, stateManager, catalog, savedOffsetIsValid);

docs/integrations/sources/mongodb-v2.md

+1
Original file line numberDiff line numberDiff line change
@@ -221,6 +221,7 @@ For more information regarding configuration parameters, please see [MongoDb Doc
221221

222222
| Version | Date | Pull Request | Subject |
223223
|:--------|:-----------|:---------------------------------------------------------|:----------------------------------------------------------------------------------------------------------|
224+
| 1.3.11 | 2024-05-02 | [37753](https://github.com/airbytehq/airbyte/pull/37753) | Chunk size(limit) should correspond to ~1GB of data. |
224225
| 1.3.10 | 2024-05-02 | [37781](https://github.com/airbytehq/airbyte/pull/37781) | Adopt latest CDK. |
225226
| 1.3.9 | 2024-05-01 | [37742](https://github.com/airbytehq/airbyte/pull/37742) | Adopt latest CDK. Remove Debezium retries. |
226227
| 1.3.8 | 2024-04-24 | [37559](https://github.com/airbytehq/airbyte/pull/37559) | Implement fixed-size chunking while performing initial load. |

0 commit comments

Comments
 (0)