Skip to content

Commit da46171

Browse files
✨ Source MongoDB Internal POC: CDC State Handling (#29763)
Co-authored-by: jdpgrailsdev <[email protected]> Co-authored-by: subodh <[email protected]>
1 parent ae7286a commit da46171

File tree

11 files changed

+502
-94
lines changed

11 files changed

+502
-94
lines changed

airbyte-integrations/bases/debezium/src/test/java/io/airbyte/integrations/debezium/internals/mongodb/MongoDbResumeTokenHelperTest.java

Lines changed: 0 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -49,7 +49,6 @@ void testTimestampExtractionFromEvent() throws IOException {
4949
final BsonTimestamp expectedTimestamp = new BsonTimestamp(timestampSec, 2);
5050
final String changeEventJson = MoreResources.readResource("mongodb/change_event.json");
5151
final JsonNode changeEvent = Jsons.deserialize(changeEventJson);
52-
5352
final BsonTimestamp timestamp = MongoDbResumeTokenHelper.extractTimestampFromEvent(changeEvent);
5453
assertNotNull(timestamp);
5554
assertEquals(expectedTimestamp, timestamp);

airbyte-integrations/connectors/source-mongodb-internal-poc/src/main/java/io/airbyte/integrations/source/mongodb/internal/MongoDbSource.java

Lines changed: 8 additions & 52 deletions
Original file line numberDiff line numberDiff line change
@@ -9,37 +9,30 @@
99
import static io.airbyte.integrations.source.mongodb.internal.MongoConstants.ID_FIELD;
1010

1111
import com.fasterxml.jackson.databind.JsonNode;
12-
import com.google.common.annotations.VisibleForTesting;
1312
import com.mongodb.client.MongoClient;
1413
import com.mongodb.client.MongoDatabase;
1514
import com.mongodb.client.model.Filters;
1615
import com.mongodb.client.model.Projections;
1716
import com.mongodb.client.model.Sorts;
1817
import com.mongodb.connection.ClusterType;
19-
import io.airbyte.commons.json.Jsons;
2018
import io.airbyte.commons.util.AutoCloseableIterator;
2119
import io.airbyte.commons.util.AutoCloseableIterators;
2220
import io.airbyte.integrations.BaseConnector;
2321
import io.airbyte.integrations.base.AirbyteTraceMessageUtility;
2422
import io.airbyte.integrations.base.IntegrationRunner;
2523
import io.airbyte.integrations.base.Source;
24+
import io.airbyte.integrations.source.mongodb.internal.state.MongoDbStateManager;
25+
import io.airbyte.integrations.source.mongodb.internal.state.MongoDbStreamState;
2626
import io.airbyte.protocol.models.v0.AirbyteCatalog;
2727
import io.airbyte.protocol.models.v0.AirbyteConnectionStatus;
2828
import io.airbyte.protocol.models.v0.AirbyteMessage;
29-
import io.airbyte.protocol.models.v0.AirbyteStateMessage;
30-
import io.airbyte.protocol.models.v0.AirbyteStateMessage.AirbyteStateType;
3129
import io.airbyte.protocol.models.v0.AirbyteStream;
3230
import io.airbyte.protocol.models.v0.CatalogHelpers;
3331
import io.airbyte.protocol.models.v0.ConfiguredAirbyteCatalog;
34-
import io.airbyte.protocol.models.v0.StreamDescriptor;
3532
import io.airbyte.protocol.models.v0.SyncMode;
3633
import java.time.Instant;
37-
import java.util.Arrays;
3834
import java.util.List;
39-
import java.util.Map;
40-
import java.util.Map.Entry;
4135
import java.util.Optional;
42-
import java.util.stream.Collectors;
4336
import org.bson.BsonDocument;
4437
import org.bson.conversions.Bson;
4538
import org.bson.types.ObjectId;
@@ -50,9 +43,6 @@ public class MongoDbSource extends BaseConnector implements Source {
5043

5144
private static final Logger LOGGER = LoggerFactory.getLogger(MongoDbSource.class);
5245

53-
/** Helper class for holding a collection-name and stream state together */
54-
private record CollectionNameState(Optional<String> name, Optional<MongodbStreamState> state) {}
55-
5646
public static void main(final String[] args) throws Exception {
5747
final Source source = new MongoDbSource();
5848
LOGGER.info("starting source: {}", MongoDbSource.class);
@@ -106,15 +96,14 @@ public AutoCloseableIterator<AirbyteMessage> read(final JsonNode config,
10696
final JsonNode state) {
10797
final var databaseName = config.get(DATABASE_CONFIGURATION_KEY).asText();
10898
final var emittedAt = Instant.now();
109-
110-
final var states = convertState(state);
99+
final var stateManager = MongoDbStateManager.createStateManager(state);
111100
final MongoClient mongoClient = MongoConnectionUtils.createMongoClient(config);
112101

113102
try {
114103
final var database = mongoClient.getDatabase(databaseName);
115104
// TODO treat INCREMENTAL and FULL_REFRESH differently?
116105
return AutoCloseableIterators.appendOnClose(AutoCloseableIterators.concatWithEagerClose(
117-
convertCatalogToIterators(catalog, states, database, emittedAt),
106+
convertCatalogToIterators(catalog, stateManager, database, emittedAt),
118107
AirbyteTraceMessageUtility::emitStreamStatusTrace),
119108
mongoClient::close);
120109
} catch (final Exception e) {
@@ -123,41 +112,12 @@ public AutoCloseableIterator<AirbyteMessage> read(final JsonNode config,
123112
}
124113
}
125114

126-
/**
127-
* Converts the JsonNode into a map of mongodb collection names to stream states.
128-
*/
129-
@VisibleForTesting
130-
protected Map<String, MongodbStreamState> convertState(final JsonNode state) {
131-
// I'm unsure if the JsonNode data is going to be a singular AirbyteStateMessage or an array of
132-
// AirbyteStateMessages.
133-
// So this currently handles both cases, converting the singular message into a list of messages,
134-
// leaving the list of messages
135-
// as a list of messages, or returning an empty list.
136-
final List<AirbyteStateMessage> states = Jsons.tryObject(state, AirbyteStateMessage.class)
137-
.map(List::of)
138-
.orElseGet(() -> Jsons.tryObject(state, AirbyteStateMessage[].class)
139-
.map(Arrays::asList)
140-
.orElse(List.of()));
141-
142-
// TODO add namespace support?
143-
return states.stream()
144-
.filter(s -> s.getType() == AirbyteStateType.STREAM)
145-
.map(s -> new CollectionNameState(
146-
Optional.ofNullable(s.getStream().getStreamDescriptor()).map(StreamDescriptor::getName),
147-
Jsons.tryObject(s.getStream().getStreamState(), MongodbStreamState.class)))
148-
// only keep states that could be parsed
149-
.filter(p -> p.name.isPresent() && p.state.isPresent())
150-
.collect(Collectors.toMap(
151-
p -> p.name.orElseThrow(),
152-
p -> p.state.orElseThrow()));
153-
}
154-
155115
/**
156116
* Converts the streams in the catalog into a list of AutoCloseableIterators.
157117
*/
158118
private List<AutoCloseableIterator<AirbyteMessage>> convertCatalogToIterators(
159119
final ConfiguredAirbyteCatalog catalog,
160-
final Map<String, MongodbStreamState> states,
120+
final MongoDbStateManager stateManager,
161121
final MongoDatabase database,
162122
final Instant emittedAt) {
163123
return catalog.getStreams()
@@ -175,12 +135,8 @@ private List<AutoCloseableIterator<AirbyteMessage>> convertCatalogToIterators(
175135
final var fields = Projections.fields(Projections.include(CatalogHelpers.getTopLevelFieldNames(airbyteStream).stream().toList()));
176136

177137
// find the existing state, if there is one, for this steam
178-
final Optional<MongodbStreamState> existingState = states.entrySet().stream()
179-
// look only for states that match this stream's name
180-
// TODO add namespace support
181-
.filter(state -> state.getKey().equals(airbyteStream.getStream().getName()))
182-
.map(Entry::getValue)
183-
.findFirst();
138+
final Optional<MongoDbStreamState> existingState =
139+
stateManager.getStreamState(airbyteStream.getStream().getName(), airbyteStream.getStream().getNamespace());
184140

185141
// The filter determines the starting point of this iterator based on the state of this collection.
186142
// If a state exists, it will use that state to create a query akin to
@@ -198,7 +154,7 @@ private List<AutoCloseableIterator<AirbyteMessage>> convertCatalogToIterators(
198154
.sort(Sorts.ascending(ID_FIELD))
199155
.cursor();
200156

201-
final var stateIterator = new MongoDbStateIterator(cursor, airbyteStream, existingState, emittedAt, CHECKPOINT_INTERVAL);
157+
final var stateIterator = new MongoDbStateIterator(cursor, stateManager, airbyteStream, emittedAt, CHECKPOINT_INTERVAL);
202158
return AutoCloseableIterators.fromIterator(stateIterator, cursor::close, null);
203159
})
204160
.toList();

airbyte-integrations/connectors/source-mongodb-internal-poc/src/main/java/io/airbyte/integrations/source/mongodb/internal/MongoDbStateIterator.java

Lines changed: 15 additions & 21 deletions
Original file line numberDiff line numberDiff line change
@@ -6,21 +6,17 @@
66

77
import com.mongodb.MongoException;
88
import com.mongodb.client.MongoCursor;
9-
import io.airbyte.commons.json.Jsons;
109
import io.airbyte.db.mongodb.MongoUtils;
10+
import io.airbyte.integrations.source.mongodb.internal.state.MongoDbStateManager;
11+
import io.airbyte.integrations.source.mongodb.internal.state.MongoDbStreamState;
1112
import io.airbyte.protocol.models.v0.AirbyteMessage;
1213
import io.airbyte.protocol.models.v0.AirbyteMessage.Type;
1314
import io.airbyte.protocol.models.v0.AirbyteRecordMessage;
14-
import io.airbyte.protocol.models.v0.AirbyteStateMessage;
15-
import io.airbyte.protocol.models.v0.AirbyteStateMessage.AirbyteStateType;
16-
import io.airbyte.protocol.models.v0.AirbyteStreamState;
1715
import io.airbyte.protocol.models.v0.CatalogHelpers;
1816
import io.airbyte.protocol.models.v0.ConfiguredAirbyteStream;
19-
import io.airbyte.protocol.models.v0.StreamDescriptor;
2017
import java.time.Instant;
2118
import java.util.Iterator;
2219
import java.util.List;
23-
import java.util.Optional;
2420
import org.bson.Document;
2521
import org.slf4j.Logger;
2622
import org.slf4j.LoggerFactory;
@@ -36,6 +32,7 @@ class MongoDbStateIterator implements Iterator<AirbyteMessage> {
3632
private static final Logger LOGGER = LoggerFactory.getLogger(MongoDbStateIterator.class);
3733

3834
private final MongoCursor<Document> iter;
35+
private final MongoDbStateManager stateManager;
3936
private final ConfiguredAirbyteStream stream;
4037
private final List<String> fields;
4138
private final Instant emittedAt;
@@ -60,23 +57,25 @@ class MongoDbStateIterator implements Iterator<AirbyteMessage> {
6057
/**
6158
* Constructor.
6259
*
63-
* @param iter MongoCursor that iterates over Mongo documents
60+
* @param iter {@link MongoCursor} that iterates over Mongo documents
61+
* @param stateManager {@link MongoDbStateManager} that manages global and per-stream state
6462
* @param stream the stream that this iterator represents
65-
* @param state the initial state of this stream
6663
* @param emittedAt when this iterator was started
6764
* @param checkpointInterval how often a state message should be emitted.
6865
*/
6966
MongoDbStateIterator(final MongoCursor<Document> iter,
67+
final MongoDbStateManager stateManager,
7068
final ConfiguredAirbyteStream stream,
71-
Optional<MongodbStreamState> state,
7269
final Instant emittedAt,
7370
final int checkpointInterval) {
7471
this.iter = iter;
72+
this.stateManager = stateManager;
7573
this.stream = stream;
7674
this.checkpointInterval = checkpointInterval;
7775
this.emittedAt = emittedAt;
78-
fields = CatalogHelpers.getTopLevelFieldNames(stream).stream().toList();
79-
lastId = state.map(MongodbStreamState::id).orElse(null);
76+
this.fields = CatalogHelpers.getTopLevelFieldNames(stream).stream().toList();
77+
this.lastId =
78+
stateManager.getStreamState(stream.getStream().getName(), stream.getStream().getNamespace()).map(MongoDbStreamState::id).orElse(null);
8079
}
8180

8281
@Override
@@ -103,20 +102,15 @@ public AirbyteMessage next() {
103102
if ((count > 0 && count % checkpointInterval == 0) || finalStateNext) {
104103
count = 0;
105104

106-
final var streamState = new AirbyteStreamState()
107-
.withStreamDescriptor(new StreamDescriptor()
108-
.withName(stream.getStream().getName())
109-
.withNamespace(stream.getStream().getNamespace()));
110105
if (lastId != null) {
111106
// TODO add type support in here once more than ObjectId fields are supported
112-
streamState.withStreamState(Jsons.jsonNode(new MongodbStreamState(lastId)));
107+
stateManager.updateStreamState(stream.getStream().getName(),
108+
stream.getStream().getNamespace(), new MongoDbStreamState(lastId));
113109
}
114110

115-
final var stateMessage = new AirbyteStateMessage()
116-
.withType(AirbyteStateType.STREAM)
117-
.withStream(streamState);
118-
119-
return new AirbyteMessage().withType(Type.STATE).withState(stateMessage);
111+
return new AirbyteMessage()
112+
.withType(Type.STATE)
113+
.withState(stateManager.toState());
120114
}
121115

122116
count++;
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,14 @@
1+
/*
2+
* Copyright (c) 2023 Airbyte, Inc., all rights reserved.
3+
*/
4+
5+
package io.airbyte.integrations.source.mongodb.internal.cdc;
6+
7+
import com.fasterxml.jackson.databind.JsonNode;
8+
9+
/**
10+
* Represents the global CDC state that is used by Debezium as an offset.
11+
*
12+
* @param state The Debezium offset state as a {@link JsonNode}.
13+
*/
14+
public record MongoDbCdcState(JsonNode state) {}
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,51 @@
1+
/*
2+
* Copyright (c) 2023 Airbyte, Inc., all rights reserved.
3+
*/
4+
5+
package io.airbyte.integrations.source.mongodb.internal.cdc;
6+
7+
import io.airbyte.commons.json.Jsons;
8+
import io.airbyte.integrations.debezium.CdcStateHandler;
9+
import io.airbyte.integrations.source.mongodb.internal.state.MongoDbStateManager;
10+
import io.airbyte.protocol.models.v0.AirbyteMessage;
11+
import io.airbyte.protocol.models.v0.AirbyteStateMessage;
12+
import java.util.Map;
13+
import org.slf4j.Logger;
14+
import org.slf4j.LoggerFactory;
15+
16+
/**
17+
* Implementation of the {@link CdcStateHandler} that handles saving the CDC offset as Airbyte state
18+
* for MongoDB.
19+
*/
20+
public class MongoDbCdcStateHandler implements CdcStateHandler {
21+
22+
private static final Logger LOGGER = LoggerFactory.getLogger(MongoDbCdcStateHandler.class);
23+
24+
private final MongoDbStateManager stateManager;
25+
26+
public MongoDbCdcStateHandler(MongoDbStateManager stateManager) {
27+
this.stateManager = stateManager;
28+
}
29+
30+
@Override
31+
public AirbyteMessage saveState(final Map<String, String> offset, String dbHistory) {
32+
final MongoDbCdcState cdcState = new MongoDbCdcState(Jsons.jsonNode(offset));
33+
34+
LOGGER.info("Saving Debezium state {}...", cdcState);
35+
stateManager.updateCdcState(cdcState);
36+
37+
final AirbyteStateMessage stateMessage = stateManager.toState();
38+
return new AirbyteMessage().withType(AirbyteMessage.Type.STATE).withState(stateMessage);
39+
}
40+
41+
@Override
42+
public AirbyteMessage saveStateAfterCompletionOfSnapshotOfNewStreams() {
43+
throw new RuntimeException("Debezium is not used to carry out the snapshot of tables.");
44+
}
45+
46+
@Override
47+
public boolean isCdcCheckpointEnabled() {
48+
return false;
49+
}
50+
51+
}

0 commit comments

Comments
 (0)