Skip to content

Commit 0385a64

Browse files
authored
🐛 Set cdc record subsequent record wait time to initial wait time as a workaround (#35114)
1 parent edcd5ed commit 0385a64

File tree

8 files changed

+55
-40
lines changed

8 files changed

+55
-40
lines changed

airbyte-integrations/connectors/source-mongodb-v2/metadata.yaml

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -5,7 +5,7 @@ data:
55
connectorSubtype: database
66
connectorType: source
77
definitionId: b2e713cd-cc36-4c0a-b5bd-b47cb8a0561e
8-
dockerImageTag: 1.2.8
8+
dockerImageTag: 1.2.9
99
dockerRepository: airbyte/source-mongodb-v2
1010
documentationUrl: https://docs.airbyte.com/integrations/sources/mongodb-v2
1111
githubIssueLabel: source-mongodb-v2

airbyte-integrations/connectors/source-mongodb-v2/src/main/java/io/airbyte/integrations/source/mongodb/MongoConnectionUtils.java

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -50,7 +50,7 @@ public static MongoClient createMongoClient(final MongoDbSourceConfig config) {
5050
}
5151

5252
private static String buildConnectionString(final MongoDbSourceConfig config) {
53-
return MongoDbDebeziumPropertiesManager.buildConnectionString(config.rawConfig(), true);
53+
return MongoDbDebeziumPropertiesManager.buildConnectionString(config.getDatabaseConfig(), true);
5454
}
5555

5656
}

airbyte-integrations/connectors/source-mongodb-v2/src/main/java/io/airbyte/integrations/source/mongodb/MongoConstants.java

Lines changed: 2 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -34,6 +34,8 @@ public class MongoConstants {
3434
public static final String USERNAME_CONFIGURATION_KEY = MongoDbDebeziumConstants.Configuration.USERNAME_CONFIGURATION_KEY;
3535
public static final String SCHEMA_ENFORCED_CONFIGURATION_KEY = MongoDbDebeziumConstants.Configuration.SCHEMA_ENFORCED_CONFIGURATION_KEY;
3636
public static final String SCHEMALESS_MODE_DATA_FIELD = Configuration.SCHEMALESS_MODE_DATA_FIELD;
37+
public static final String INITIAL_RECORD_WAITING_TIME_SEC = "initial_waiting_seconds";
38+
public static final Integer DEFAULT_INITIAL_RECORD_WAITING_TIME_SEC = 300;
3739

3840
private MongoConstants() {}
3941

airbyte-integrations/connectors/source-mongodb-v2/src/main/java/io/airbyte/integrations/source/mongodb/MongoDbSourceConfig.java

Lines changed: 27 additions & 18 deletions
Original file line numberDiff line numberDiff line change
@@ -11,7 +11,9 @@
1111
import static io.airbyte.integrations.source.mongodb.MongoConstants.DATABASE_CONFIG_CONFIGURATION_KEY;
1212
import static io.airbyte.integrations.source.mongodb.MongoConstants.DEFAULT_AUTH_SOURCE;
1313
import static io.airbyte.integrations.source.mongodb.MongoConstants.DEFAULT_DISCOVER_SAMPLE_SIZE;
14+
import static io.airbyte.integrations.source.mongodb.MongoConstants.DEFAULT_INITIAL_RECORD_WAITING_TIME_SEC;
1415
import static io.airbyte.integrations.source.mongodb.MongoConstants.DISCOVER_SAMPLE_SIZE_CONFIGURATION_KEY;
16+
import static io.airbyte.integrations.source.mongodb.MongoConstants.INITIAL_RECORD_WAITING_TIME_SEC;
1517
import static io.airbyte.integrations.source.mongodb.MongoConstants.PASSWORD_CONFIGURATION_KEY;
1618
import static io.airbyte.integrations.source.mongodb.MongoConstants.SCHEMA_ENFORCED_CONFIGURATION_KEY;
1719
import static io.airbyte.integrations.source.mongodb.MongoConstants.USERNAME_CONFIGURATION_KEY;
@@ -27,33 +29,32 @@
2729
*/
2830
public record MongoDbSourceConfig(JsonNode rawConfig) {
2931

30-
/**
31-
* Constructs a new {@link MongoDbSourceConfig} from the provided raw configuration.
32-
*
33-
* @param rawConfig The underlying JSON configuration provided by the connector framework.
34-
* @throws IllegalArgumentException if the raw configuration does not contain the
35-
* {@link MongoConstants#DATABASE_CONFIG_CONFIGURATION_KEY} key.
36-
*/
37-
public MongoDbSourceConfig(final JsonNode rawConfig) {
38-
if (rawConfig.has(DATABASE_CONFIG_CONFIGURATION_KEY)) {
39-
this.rawConfig = rawConfig.get(DATABASE_CONFIG_CONFIGURATION_KEY);
40-
} else {
32+
public MongoDbSourceConfig {
33+
if (rawConfig == null) {
34+
throw new IllegalArgumentException("MongoDbSourceConfig cannot accept a null config.");
35+
}
36+
if (!rawConfig.hasNonNull(DATABASE_CONFIG_CONFIGURATION_KEY)) {
4137
throw new IllegalArgumentException("Database configuration is missing required '" + DATABASE_CONFIG_CONFIGURATION_KEY + "' property.");
4238
}
4339
}
4440

41+
public JsonNode getDatabaseConfig() {
42+
return rawConfig.get(DATABASE_CONFIG_CONFIGURATION_KEY);
43+
}
44+
4545
public String getAuthSource() {
46-
return rawConfig.has(AUTH_SOURCE_CONFIGURATION_KEY) ? rawConfig.get(AUTH_SOURCE_CONFIGURATION_KEY).asText(DEFAULT_AUTH_SOURCE)
46+
return getDatabaseConfig().has(AUTH_SOURCE_CONFIGURATION_KEY) ? getDatabaseConfig().get(AUTH_SOURCE_CONFIGURATION_KEY).asText(DEFAULT_AUTH_SOURCE)
4747
: DEFAULT_AUTH_SOURCE;
4848
}
4949

5050
public Integer getCheckpointInterval() {
51-
return rawConfig.has(CHECKPOINT_INTERVAL_CONFIGURATION_KEY) ? rawConfig.get(CHECKPOINT_INTERVAL_CONFIGURATION_KEY).asInt(CHECKPOINT_INTERVAL)
51+
return getDatabaseConfig().has(CHECKPOINT_INTERVAL_CONFIGURATION_KEY)
52+
? getDatabaseConfig().get(CHECKPOINT_INTERVAL_CONFIGURATION_KEY).asInt(CHECKPOINT_INTERVAL)
5253
: CHECKPOINT_INTERVAL;
5354
}
5455

5556
public String getDatabaseName() {
56-
return rawConfig.has(DATABASE_CONFIGURATION_KEY) ? rawConfig.get(DATABASE_CONFIGURATION_KEY).asText() : null;
57+
return getDatabaseConfig().has(DATABASE_CONFIGURATION_KEY) ? getDatabaseConfig().get(DATABASE_CONFIGURATION_KEY).asText() : null;
5758
}
5859

5960
public OptionalInt getQueueSize() {
@@ -63,15 +64,15 @@ public OptionalInt getQueueSize() {
6364
}
6465

6566
public String getPassword() {
66-
return rawConfig.has(PASSWORD_CONFIGURATION_KEY) ? rawConfig.get(PASSWORD_CONFIGURATION_KEY).asText() : null;
67+
return getDatabaseConfig().has(PASSWORD_CONFIGURATION_KEY) ? getDatabaseConfig().get(PASSWORD_CONFIGURATION_KEY).asText() : null;
6768
}
6869

6970
public String getUsername() {
70-
return rawConfig.has(USERNAME_CONFIGURATION_KEY) ? rawConfig.get(USERNAME_CONFIGURATION_KEY).asText() : null;
71+
return getDatabaseConfig().has(USERNAME_CONFIGURATION_KEY) ? getDatabaseConfig().get(USERNAME_CONFIGURATION_KEY).asText() : null;
7172
}
7273

7374
public boolean hasAuthCredentials() {
74-
return rawConfig.has(USERNAME_CONFIGURATION_KEY) && rawConfig.has(PASSWORD_CONFIGURATION_KEY);
75+
return getDatabaseConfig().has(USERNAME_CONFIGURATION_KEY) && getDatabaseConfig().has(PASSWORD_CONFIGURATION_KEY);
7576
}
7677

7778
public Integer getSampleSize() {
@@ -83,8 +84,16 @@ public Integer getSampleSize() {
8384
}
8485

8586
public boolean getEnforceSchema() {
86-
return rawConfig.has(SCHEMA_ENFORCED_CONFIGURATION_KEY) ? rawConfig.get(SCHEMA_ENFORCED_CONFIGURATION_KEY).asBoolean(true)
87+
return getDatabaseConfig().has(SCHEMA_ENFORCED_CONFIGURATION_KEY) ? getDatabaseConfig().get(SCHEMA_ENFORCED_CONFIGURATION_KEY).asBoolean(true)
8788
: true;
8889
}
8990

91+
public Integer getInitialWaitingTimeSeconds() {
92+
if (rawConfig.has(INITIAL_RECORD_WAITING_TIME_SEC)) {
93+
return rawConfig.get(INITIAL_RECORD_WAITING_TIME_SEC).asInt(DEFAULT_INITIAL_RECORD_WAITING_TIME_SEC);
94+
} else {
95+
return DEFAULT_INITIAL_RECORD_WAITING_TIME_SEC;
96+
}
97+
}
98+
9099
}

airbyte-integrations/connectors/source-mongodb-v2/src/main/java/io/airbyte/integrations/source/mongodb/cdc/MongoDbCdcInitializer.java

Lines changed: 9 additions & 7 deletions
Original file line numberDiff line numberDiff line change
@@ -9,7 +9,6 @@
99
import com.mongodb.client.MongoClient;
1010
import com.mongodb.client.MongoDatabase;
1111
import io.airbyte.cdk.integrations.debezium.AirbyteDebeziumHandler;
12-
import io.airbyte.cdk.integrations.debezium.internals.RecordWaitTimeUtil;
1312
import io.airbyte.commons.json.Jsons;
1413
import io.airbyte.commons.util.AutoCloseableIterator;
1514
import io.airbyte.commons.util.AutoCloseableIterators;
@@ -80,8 +79,11 @@ public List<AutoCloseableIterator<AirbyteMessage>> createCdcIterators(
8079
final Instant emittedAt,
8180
final MongoDbSourceConfig config) {
8281

83-
final Duration firstRecordWaitTime = RecordWaitTimeUtil.getFirstRecordWaitTime(config.rawConfig());
84-
final Duration subsequentRecordWaitTime = RecordWaitTimeUtil.getSubsequentRecordWaitTime(config.rawConfig());
82+
final Duration firstRecordWaitTime = Duration.ofSeconds(config.getInitialWaitingTimeSeconds());
83+
// #35059: debezium heartbeats are not sent on the expected interval. this is
84+
// a worksaround to allow making subsequent wait time configurable.
85+
final Duration subsequentRecordWaitTime = firstRecordWaitTime;
86+
LOGGER.info("Subsequent cdc record wait time: {} seconds", subsequentRecordWaitTime);
8587
final int queueSize = MongoUtil.getDebeziumEventQueueSize(config);
8688
final String databaseName = config.getDatabaseName();
8789
final boolean isEnforceSchema = config.getEnforceSchema();
@@ -97,7 +99,7 @@ public List<AutoCloseableIterator<AirbyteMessage>> createCdcIterators(
9799
Jsons.clone(defaultDebeziumProperties),
98100
catalog,
99101
cdcState.state(),
100-
config.rawConfig(),
102+
config.getDatabaseConfig(),
101103
mongoClient);
102104

103105
// We should always be able to extract offset out of state if it's not null
@@ -131,12 +133,12 @@ public List<AutoCloseableIterator<AirbyteMessage>> createCdcIterators(
131133
initialSnapshotHandler.getIterators(initialSnapshotStreams, stateManager, mongoClient.getDatabase(databaseName), cdcMetadataInjector,
132134
emittedAt, config.getCheckpointInterval(), isEnforceSchema);
133135

134-
final AirbyteDebeziumHandler<BsonTimestamp> handler = new AirbyteDebeziumHandler<>(config.rawConfig(),
136+
final AirbyteDebeziumHandler<BsonTimestamp> handler = new AirbyteDebeziumHandler<>(config.getDatabaseConfig(),
135137
new MongoDbCdcTargetPosition(initialResumeToken), false, firstRecordWaitTime, subsequentRecordWaitTime, queueSize, false);
136138
final MongoDbCdcStateHandler mongoDbCdcStateHandler = new MongoDbCdcStateHandler(stateManager);
137139
final MongoDbCdcSavedInfoFetcher cdcSavedInfoFetcher = new MongoDbCdcSavedInfoFetcher(stateToBeUsed);
138-
final var propertiesManager = new MongoDbDebeziumPropertiesManager(defaultDebeziumProperties, config.rawConfig(), catalog);
139-
final var eventConverter = new MongoDbDebeziumEventConverter(cdcMetadataInjector, catalog, emittedAt, config.rawConfig());
140+
final var propertiesManager = new MongoDbDebeziumPropertiesManager(defaultDebeziumProperties, config.getDatabaseConfig(), catalog);
141+
final var eventConverter = new MongoDbDebeziumEventConverter(cdcMetadataInjector, catalog, emittedAt, config.getDatabaseConfig());
140142

141143
final Supplier<AutoCloseableIterator<AirbyteMessage>> incrementalIteratorSupplier = () -> handler.getIncrementalIterators(
142144
propertiesManager, eventConverter, cdcSavedInfoFetcher, mongoDbCdcStateHandler);

airbyte-integrations/connectors/source-mongodb-v2/src/test/java/io/airbyte/integrations/source/mongodb/MongoDbSourceConfigTest.java

Lines changed: 10 additions & 9 deletions
Original file line numberDiff line numberDiff line change
@@ -39,23 +39,24 @@ void testCreatingMongoDbSourceConfig() {
3939
final String username = "username";
4040
final boolean isSchemaEnforced = false;
4141
final JsonNode rawConfig = Jsons.jsonNode(
42-
Map.of(DATABASE_CONFIG_CONFIGURATION_KEY, Map.of(
43-
AUTH_SOURCE_CONFIGURATION_KEY, authSource,
44-
CHECKPOINT_INTERVAL_CONFIGURATION_KEY, checkpointInterval,
45-
DATABASE_CONFIGURATION_KEY, database,
42+
Map.of(
4643
DISCOVER_SAMPLE_SIZE_CONFIGURATION_KEY, sampleSize,
47-
PASSWORD_CONFIGURATION_KEY, password,
4844
QUEUE_SIZE_CONFIGURATION_KEY, queueSize,
49-
USERNAME_CONFIGURATION_KEY, username,
50-
SCHEMA_ENFORCED_CONFIGURATION_KEY, isSchemaEnforced)));
45+
DATABASE_CONFIG_CONFIGURATION_KEY, Map.of(
46+
AUTH_SOURCE_CONFIGURATION_KEY, authSource,
47+
CHECKPOINT_INTERVAL_CONFIGURATION_KEY, checkpointInterval,
48+
DATABASE_CONFIGURATION_KEY, database,
49+
PASSWORD_CONFIGURATION_KEY, password,
50+
USERNAME_CONFIGURATION_KEY, username,
51+
SCHEMA_ENFORCED_CONFIGURATION_KEY, isSchemaEnforced)));
5152
final MongoDbSourceConfig sourceConfig = new MongoDbSourceConfig(rawConfig);
5253
assertNotNull(sourceConfig);
5354
assertEquals(authSource, sourceConfig.getAuthSource());
5455
assertEquals(checkpointInterval, sourceConfig.getCheckpointInterval());
5556
assertEquals(database, sourceConfig.getDatabaseName());
5657
assertEquals(password, sourceConfig.getPassword());
5758
assertEquals(OptionalInt.of(queueSize), sourceConfig.getQueueSize());
58-
assertEquals(rawConfig.get(DATABASE_CONFIG_CONFIGURATION_KEY), sourceConfig.rawConfig());
59+
assertEquals(rawConfig.get(DATABASE_CONFIG_CONFIGURATION_KEY), sourceConfig.getDatabaseConfig());
5960
assertEquals(sampleSize, sourceConfig.getSampleSize());
6061
assertEquals(username, sourceConfig.getUsername());
6162
assertEquals(isSchemaEnforced, sourceConfig.getEnforceSchema());
@@ -76,7 +77,7 @@ void testDefaultValues() {
7677
assertEquals(null, sourceConfig.getDatabaseName());
7778
assertEquals(null, sourceConfig.getPassword());
7879
assertEquals(OptionalInt.empty(), sourceConfig.getQueueSize());
79-
assertEquals(rawConfig.get(DATABASE_CONFIG_CONFIGURATION_KEY), sourceConfig.rawConfig());
80+
assertEquals(rawConfig.get(DATABASE_CONFIG_CONFIGURATION_KEY), sourceConfig.getDatabaseConfig());
8081
assertEquals(DEFAULT_DISCOVER_SAMPLE_SIZE, sourceConfig.getSampleSize());
8182
assertEquals(null, sourceConfig.getUsername());
8283
}

airbyte-integrations/connectors/source-mongodb-v2/src/test/java/io/airbyte/integrations/source/mongodb/MongoUtilTest.java

Lines changed: 3 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -312,11 +312,11 @@ void testGetAuthorizedCollectionsMongoSecurityException() {
312312
void testGetDebeziumEventQueueSize() {
313313
final int queueSize = 5000;
314314
final MongoDbSourceConfig validQueueSizeConfiguration = new MongoDbSourceConfig(
315-
Jsons.jsonNode(Map.of(DATABASE_CONFIG_CONFIGURATION_KEY, Map.of(MongoConstants.QUEUE_SIZE_CONFIGURATION_KEY, queueSize))));
315+
Jsons.jsonNode(Map.of(MongoConstants.QUEUE_SIZE_CONFIGURATION_KEY, queueSize, DATABASE_CONFIG_CONFIGURATION_KEY, Map.of())));
316316
final MongoDbSourceConfig tooSmallQueueSizeConfiguration = new MongoDbSourceConfig(
317-
Jsons.jsonNode(Map.of(DATABASE_CONFIG_CONFIGURATION_KEY, Map.of(MongoConstants.QUEUE_SIZE_CONFIGURATION_KEY, Integer.MIN_VALUE))));
317+
Jsons.jsonNode(Map.of(MongoConstants.QUEUE_SIZE_CONFIGURATION_KEY, Integer.MIN_VALUE, DATABASE_CONFIG_CONFIGURATION_KEY, Map.of())));
318318
final MongoDbSourceConfig tooLargeQueueSizeConfiguration = new MongoDbSourceConfig(
319-
Jsons.jsonNode(Map.of(DATABASE_CONFIG_CONFIGURATION_KEY, Map.of(MongoConstants.QUEUE_SIZE_CONFIGURATION_KEY, Integer.MAX_VALUE))));
319+
Jsons.jsonNode(Map.of(MongoConstants.QUEUE_SIZE_CONFIGURATION_KEY, Integer.MAX_VALUE, DATABASE_CONFIG_CONFIGURATION_KEY, Map.of())));
320320
final MongoDbSourceConfig missingQueueSizeConfiguration =
321321
new MongoDbSourceConfig(Jsons.jsonNode(Map.of(DATABASE_CONFIG_CONFIGURATION_KEY, Map.of())));
322322

docs/integrations/sources/mongodb-v2.md

Lines changed: 2 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -214,7 +214,8 @@ For more information regarding configuration parameters, please see [MongoDb Doc
214214

215215
| Version | Date | Pull Request | Subject |
216216
|:--------|:-----------|:---------------------------------------------------------|:----------------------------------------------------------------------------------------------------------|
217-
| 1.2.8 | 2024-02-08 | [34748](https://github.com/airbytehq/airbyte/pull/34748) | Adopt CDK 0.19.0 |
217+
| 1.2.9 | 2024-02-13 | [35114](https://github.com/airbytehq/airbyte/pull/35114) | Extend subsequent cdc record wait time to the duration of initial. Bug Fixes |
218+
| 1.2.8 | 2024-02-08 | [34748](https://github.com/airbytehq/airbyte/pull/34748) | Adopt CDK 0.19.0 |
218219
| 1.2.7 | 2024-02-01 | [34759](https://github.com/airbytehq/airbyte/pull/34759) | Fail sync if initial snapshot for any stream fails. |
219220
| 1.2.6 | 2024-01-31 | [34594](https://github.com/airbytehq/airbyte/pull/34594) | Scope initial resume token to streams of interest. |
220221
| 1.2.5 | 2024-01-29 | [34641](https://github.com/airbytehq/airbyte/pull/34641) | Allow resuming an initial snapshot when Id type is not of default ObjectId . |

0 commit comments

Comments
 (0)