Skip to content

Commit 8eb634c

Browse files
authored
[Source-mongodb-v2] : Fix logic in determining whether a resume token is valid or not (#37557)
1 parent 86ee91e commit 8eb634c

File tree

5 files changed

+48
-14
lines changed

5 files changed

+48
-14
lines changed

airbyte-integrations/connectors/source-mongodb-v2/metadata.yaml

+1-1
Original file line numberDiff line numberDiff line change
@@ -8,7 +8,7 @@ data:
88
connectorSubtype: database
99
connectorType: source
1010
definitionId: b2e713cd-cc36-4c0a-b5bd-b47cb8a0561e
11-
dockerImageTag: 1.3.6
11+
dockerImageTag: 1.3.7
1212
dockerRepository: airbyte/source-mongodb-v2
1313
documentationUrl: https://docs.airbyte.com/integrations/sources/mongodb-v2
1414
githubIssueLabel: source-mongodb-v2

airbyte-integrations/connectors/source-mongodb-v2/src/main/java/io/airbyte/integrations/source/mongodb/cdc/MongoDbCdcInitializer.java

+3-1
Original file line numberDiff line numberDiff line change
@@ -116,7 +116,9 @@ public List<AutoCloseableIterator<AirbyteMessage>> createCdcIterators(
116116
}
117117

118118
final boolean savedOffsetIsValid =
119-
optSavedOffset.filter(savedOffset -> mongoDbDebeziumStateUtil.isValidResumeToken(savedOffset, mongoClient)).isPresent();
119+
optSavedOffset
120+
.filter(savedOffset -> mongoDbDebeziumStateUtil.isValidResumeToken(savedOffset, mongoClient, databaseName, incrementalOnlyStreamsCatalog))
121+
.isPresent();
120122

121123
if (!savedOffsetIsValid) {
122124
AirbyteTraceMessageUtility.emitAnalyticsTrace(cdcCursorInvalidMessage());

airbyte-integrations/connectors/source-mongodb-v2/src/main/java/io/airbyte/integrations/source/mongodb/cdc/MongoDbDebeziumStateUtil.java

+24-7
Original file line numberDiff line numberDiff line change
@@ -9,6 +9,8 @@
99
import com.mongodb.MongoCommandException;
1010
import com.mongodb.client.ChangeStreamIterable;
1111
import com.mongodb.client.MongoClient;
12+
import com.mongodb.client.model.Aggregates;
13+
import com.mongodb.client.model.Filters;
1214
import io.airbyte.cdk.integrations.debezium.internals.AirbyteFileOffsetBackingStore;
1315
import io.airbyte.cdk.integrations.debezium.internals.DebeziumPropertiesManager;
1416
import io.airbyte.cdk.integrations.debezium.internals.DebeziumStateUtil;
@@ -23,6 +25,7 @@
2325
import io.debezium.connector.mongodb.ReplicaSets;
2426
import io.debezium.connector.mongodb.ResumeTokens;
2527
import java.util.Collection;
28+
import java.util.Collections;
2629
import java.util.LinkedHashMap;
2730
import java.util.LinkedList;
2831
import java.util.List;
@@ -35,6 +38,7 @@
3538
import org.bson.BsonDocument;
3639
import org.bson.BsonString;
3740
import org.bson.BsonTimestamp;
41+
import org.bson.conversions.Bson;
3842
import org.slf4j.Logger;
3943
import org.slf4j.LoggerFactory;
4044

@@ -103,25 +107,38 @@ public static String getReplicaSetName(final MongoClient mongoClient) {
103107
*
104108
* @param savedOffset The resume token from the saved offset.
105109
* @param mongoClient The {@link MongoClient} used to validate the saved offset.
110+
*
106111
* @return {@code true} if the saved offset value is valid Otherwise, {@code false} is returned to
107112
* indicate that an initial snapshot should be performed.
108113
*/
109-
public boolean isValidResumeToken(final BsonDocument savedOffset, final MongoClient mongoClient) {
114+
public boolean isValidResumeToken(final BsonDocument savedOffset,
115+
final MongoClient mongoClient,
116+
final String databaseName,
117+
final ConfiguredAirbyteCatalog catalog) {
110118
if (Objects.isNull(savedOffset) || savedOffset.isEmpty()) {
111119
return true;
112120
}
113121

114-
final ChangeStreamIterable<BsonDocument> stream = mongoClient.watch(BsonDocument.class);
115-
stream.resumeAfter(savedOffset);
116-
try (final var ignored = stream.cursor()) {
122+
// Scope the change stream to the collections & database of interest - this mirrors the logic while
123+
// getting the most recent resume token.
124+
final List<String> collectionsList = catalog.getStreams().stream()
125+
.map(s -> s.getStream().getName())
126+
.toList();
127+
final List<Bson> pipeline = Collections.singletonList(Aggregates.match(
128+
Filters.in("ns.coll", collectionsList)));
129+
final ChangeStreamIterable<BsonDocument> eventStream = mongoClient.getDatabase(databaseName).watch(pipeline, BsonDocument.class);
130+
131+
// Attempt to start the stream after the saved offset.
132+
eventStream.resumeAfter(savedOffset);
133+
try (final var ignored = eventStream.cursor()) {
117134
LOGGER.info("Valid resume token '{}' present, corresponding to timestamp (seconds after epoch) : {}. Incremental sync will be performed for "
118135
+ "up-to-date streams.",
119136
ResumeTokens.getData(savedOffset).asString().getValue(), ResumeTokens.getTimestamp(savedOffset).getTime());
120137
return true;
121138
} catch (final MongoCommandException | MongoChangeStreamException e) {
122-
LOGGER.info("Invalid resume token '{}' present, corresponding to timestamp (seconds after epoch) : {}. Initial snapshot will be performed for "
123-
+ "all streams.",
124-
ResumeTokens.getData(savedOffset).asString().getValue(), ResumeTokens.getTimestamp(savedOffset).getTime());
139+
LOGGER.info("Exception : {}", e.getMessage());
140+
LOGGER.info("Invalid resume token '{}' present, corresponding to timestamp (seconds after epoch) : {}, due to reason {}",
141+
ResumeTokens.getData(savedOffset).asString().getValue(), ResumeTokens.getTimestamp(savedOffset).getTime(), e.getMessage());
125142
return false;
126143
}
127144
}

airbyte-integrations/connectors/source-mongodb-v2/src/test/java/io/airbyte/integrations/source/mongodb/cdc/MongoDbDebeziumStateUtilTest.java

+18-4
Original file line numberDiff line numberDiff line change
@@ -18,6 +18,9 @@
1818
import com.mongodb.client.ChangeStreamIterable;
1919
import com.mongodb.client.MongoChangeStreamCursor;
2020
import com.mongodb.client.MongoClient;
21+
import com.mongodb.client.MongoDatabase;
22+
import com.mongodb.client.model.Aggregates;
23+
import com.mongodb.client.model.Filters;
2124
import com.mongodb.client.model.changestream.ChangeStreamDocument;
2225
import com.mongodb.connection.ClusterDescription;
2326
import com.mongodb.connection.ClusterType;
@@ -30,12 +33,14 @@
3033
import io.airbyte.protocol.models.v0.ConfiguredAirbyteCatalog;
3134
import io.airbyte.protocol.models.v0.SyncMode;
3235
import io.debezium.connector.mongodb.ResumeTokens;
36+
import java.util.Collections;
3337
import java.util.List;
3438
import java.util.Map;
3539
import java.util.Optional;
3640
import java.util.Properties;
3741
import org.bson.BsonDocument;
3842
import org.bson.BsonTimestamp;
43+
import org.bson.conversions.Bson;
3944
import org.junit.jupiter.api.BeforeEach;
4045
import org.junit.jupiter.api.Test;
4146

@@ -135,13 +140,17 @@ void testIsResumeTokenValid() {
135140
final MongoChangeStreamCursor<ChangeStreamDocument<BsonDocument>> mongoChangeStreamCursor =
136141
mock(MongoChangeStreamCursor.class);
137142
final MongoClient mongoClient = mock(MongoClient.class);
143+
final MongoDatabase mongoDatabase = mock(MongoDatabase.class);
138144

139145
when(mongoChangeStreamCursor.getResumeToken()).thenReturn(resumeToken);
140146
when(changeStreamIterable.cursor()).thenReturn(mongoChangeStreamCursor);
141147
when(changeStreamIterable.resumeAfter(resumeToken)).thenReturn(changeStreamIterable);
142-
when(mongoClient.watch(BsonDocument.class)).thenReturn(changeStreamIterable);
148+
when(mongoClient.getDatabase(DATABASE)).thenReturn(mongoDatabase);
149+
final List<Bson> pipeline = Collections.singletonList(Aggregates.match(
150+
Filters.in("ns.coll", List.of("test-collection"))));
151+
when(mongoDatabase.watch(pipeline, BsonDocument.class)).thenReturn(changeStreamIterable);
143152

144-
assertTrue(mongoDbDebeziumStateUtil.isValidResumeToken(resumeToken, mongoClient));
153+
assertTrue(mongoDbDebeziumStateUtil.isValidResumeToken(resumeToken, mongoClient, DATABASE, CONFIGURED_CATALOG));
145154
}
146155

147156
@Test
@@ -151,14 +160,19 @@ void testIsResumeTokenInvalid() {
151160
final ChangeStreamIterable<BsonDocument> changeStreamIterable = mock(ChangeStreamIterable.class);
152161
final MongoChangeStreamCursor<ChangeStreamDocument<BsonDocument>> mongoChangeStreamCursor =
153162
mock(MongoChangeStreamCursor.class);
163+
154164
final MongoClient mongoClient = mock(MongoClient.class);
165+
final MongoDatabase mongoDatabase = mock(MongoDatabase.class);
155166

156167
when(mongoChangeStreamCursor.getResumeToken()).thenReturn(resumeToken);
157168
when(changeStreamIterable.cursor()).thenThrow(new MongoCommandException(new BsonDocument(), new ServerAddress()));
158169
when(changeStreamIterable.resumeAfter(resumeToken)).thenReturn(changeStreamIterable);
159-
when(mongoClient.watch(BsonDocument.class)).thenReturn(changeStreamIterable);
170+
when(mongoClient.getDatabase(DATABASE)).thenReturn(mongoDatabase);
171+
final List<Bson> pipeline = Collections.singletonList(Aggregates.match(
172+
Filters.in("ns.coll", List.of("test-collection"))));
173+
when(mongoDatabase.watch(pipeline, BsonDocument.class)).thenReturn(changeStreamIterable);
160174

161-
assertFalse(mongoDbDebeziumStateUtil.isValidResumeToken(resumeToken, mongoClient));
175+
assertFalse(mongoDbDebeziumStateUtil.isValidResumeToken(resumeToken, mongoClient, DATABASE, CONFIGURED_CATALOG));
162176
}
163177

164178
}

docs/integrations/sources/mongodb-v2.md

+2-1
Original file line numberDiff line numberDiff line change
@@ -221,7 +221,8 @@ For more information regarding configuration parameters, please see [MongoDb Doc
221221

222222
| Version | Date | Pull Request | Subject |
223223
|:--------|:-----------|:---------------------------------------------------------|:----------------------------------------------------------------------------------------------------------|
224-
| 1.3.6 | 2024-04-24 | [37525](https://github.com/airbytehq/airbyte/pull/37525) | Do not send estimate trace if we do not have data. |
224+
| 1.3.7 | 2024-04-24 | [37557](https://github.com/airbytehq/airbyte/pull/37557) | Change bug in resume token validity check. |
225+
| 1.3.6 | 2024-04-24 | [37525](https://github.com/airbytehq/airbyte/pull/37525) | Internal refactor. |
225226
| 1.3.5 | 2024-04-22 | [37348](https://github.com/airbytehq/airbyte/pull/37348) | Do not send estimate trace if we do not have data. |
226227
| 1.3.4 | 2024-04-16 | [37348](https://github.com/airbytehq/airbyte/pull/37348) | Populate null values in airbyte record messages. |
227228
| 1.3.3 | 2024-04-05 | [36872](https://github.com/airbytehq/airbyte/pull/36872) | Update to connector's metadat definition. |

0 commit comments

Comments
 (0)