Skip to content

✨ Source MongoDB Internal POC: CDC State Handling #29763

New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Merged
Merged
Show file tree
Hide file tree
Changes from 24 commits
Commits
Show all changes
19 commits
Select commit Hold shift + click to select a range
f3f0ead
Add state management and handling for CDC integration
jdpgrailsdev Aug 23, 2023
3ab0fe2
Formatting
jdpgrailsdev Aug 23, 2023
4b5552f
Fix typo
jdpgrailsdev Aug 23, 2023
2f2fe53
Merge branch 'master' into jonathan/cdc-state-handler
jdpgrailsdev Aug 23, 2023
f67aa27
Merge branch 'master' into jonathan/cdc-state-handler
jdpgrailsdev Aug 23, 2023
9317ccf
Merge branch 'master' into jonathan/cdc-state-handler
jdpgrailsdev Aug 23, 2023
9517f34
Merge branch 'master' into jonathan/cdc-state-handler
jdpgrailsdev Aug 24, 2023
fca3fbc
Merge branch 'master' into jonathan/cdc-state-handler
jdpgrailsdev Aug 24, 2023
b08b556
Automated Commit - Format and Process Resources Changes
jdpgrailsdev Aug 24, 2023
cbf5f09
Merge branch 'master' into jonathan/cdc-state-handler
jdpgrailsdev Aug 24, 2023
af7feba
Merge branch 'jonathan/cdc-metadata-injector' into jonathan/cdc-state…
jdpgrailsdev Aug 25, 2023
a8b2455
Merge branch 'jonathan/cdc-metadata-injector' into jonathan/cdc-state…
jdpgrailsdev Aug 25, 2023
291f6ff
PR feedback
jdpgrailsdev Aug 25, 2023
31726f3
Store offset map in CDC state
jdpgrailsdev Aug 25, 2023
ee781c1
Formatting
jdpgrailsdev Aug 25, 2023
28e4dc0
Merge branch 'jonathan/cdc-metadata-injector' into jonathan/cdc-state…
jdpgrailsdev Aug 25, 2023
971af01
disable checkpointing + saveStateAfterCompletionOfSnapshotOfNewStreams
subodh1810 Aug 25, 2023
9a13b5f
Merge branch 'jonathan/cdc-metadata-injector' into jonathan/cdc-state…
jdpgrailsdev Aug 28, 2023
8743788
Automated Commit - Formatting Changes
jdpgrailsdev Aug 28, 2023
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -34,7 +34,18 @@ default void addMetaDataToRowsFetchedOutsideDebezium(final ObjectNode record, fi
* @param source part of debezium record and contains the metadata about the record. We need to
* extract namespace out of this metadata and return Ref :
* https://debezium.io/documentation/reference/1.9/connectors/mysql.html#mysql-create-events
* @return the stream namespace extracted from the change event source.
*/
String namespace(JsonNode source);

/**
* As part of Airbyte record we need to add the name (e.g. table name)
*
* @param source part of debezium record and contains the metadata about the record. We need to
* extract namespace out of this metadata and return Ref :
* https://debezium.io/documentation/reference/1.9/connectors/mysql.html#mysql-create-events
* @return The stream name extracted from the change event source.
*/
String name(JsonNode source);

}
Original file line number Diff line number Diff line change
Expand Up @@ -28,7 +28,7 @@ public static AirbyteMessage toAirbyteMessage(final ChangeEventWithMetadata even

final JsonNode data = formatDebeziumData(before, after, source, cdcMetadataInjector);
final String schemaName = cdcMetadataInjector.namespace(source);
final String streamName = source.get("table").asText();
final String streamName = cdcMetadataInjector.name(source);

final AirbyteRecordMessage airbyteRecordMessage = new AirbyteRecordMessage()
.withStream(streamName)
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -67,7 +67,7 @@ public boolean reachedTargetPosition(final ChangeEventWithMetadata changeEventWi
return true;
} else {
final BsonTimestamp eventResumeTokenTimestamp =
MongoDbResumeTokenHelper.extractTimestamp(changeEventWithMetadata.eventValueAsJson());
MongoDbResumeTokenHelper.extractTimestampFromEvent(changeEventWithMetadata.eventValueAsJson());
boolean isEventResumeTokenAfter = resumeTokenTimestamp.compareTo(eventResumeTokenTimestamp) <= 0;
if (isEventResumeTokenAfter) {
LOGGER.info("Signalling close because record's event timestamp {} is after target event timestamp {}.",
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -17,13 +17,11 @@ public class MongoDbDebeziumConstants {
public static class ChangeEvent {

public static final String SOURCE = "source";

public static final String SOURCE_COLLECTION = SourceInfo.COLLECTION;
public static final String SOURCE_DB = "db";
public static final String SOURCE_ORDER = SourceInfo.ORDER;

public static final String SOURCE_RESUME_TOKEN = "resume_token";

public static final String SOURCE_SECONDS = SourceInfo.TIMESTAMP;

public static final String SOURCE_TIMESTAMP_MS = "ts_ms";

}
Expand All @@ -34,15 +32,10 @@ public static class ChangeEvent {
public static class OffsetState {

public static final String KEY_REPLICA_SET = SourceInfo.REPLICA_SET_NAME;

public static final String KEY_SERVER_ID = SourceInfo.SERVER_ID_KEY;

public static final String VALUE_INCREMENT = SourceInfo.ORDER;

public static final String VALUE_RESUME_TOKEN = "resume_token";

public static final String VALUE_SECONDS = SourceInfo.TIMESTAMP;

public static final String VALUE_TRANSACTION_ID = "transaction_id";

}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -48,8 +48,19 @@ public static BsonDocument getResumeToken(final MongoClient mongoClient) {
* @return The extracted timestamp
* @throws IllegalStateException if the timestamp could not be extracted from the change event.
*/
public static BsonTimestamp extractTimestamp(final JsonNode event) {
return Optional.ofNullable(event.get(MongoDbDebeziumConstants.ChangeEvent.SOURCE))
public static BsonTimestamp extractTimestampFromEvent(final JsonNode event) {
return extractTimestampFromSource(event.get(MongoDbDebeziumConstants.ChangeEvent.SOURCE));
}

/**
* Extracts the timestamp from a Debezium MongoDB change event source object.
*
* @param source The Debezium MongoDB change event source object as JSON.
* @return The extracted timestamp
* @throws IllegalStateException if the timestamp could not be extracted from the change event.
*/
public static BsonTimestamp extractTimestampFromSource(final JsonNode source) {
return Optional.ofNullable(source)
.flatMap(MongoDbResumeTokenHelper::createTimestampFromSource)
.orElseThrow(() -> new IllegalStateException("Could not find timestamp"));
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -84,6 +84,11 @@ public String namespace(final JsonNode source) {
return source.get("schema").asText();
}

@Override
public String name(JsonNode source) {
return source.get("table").asText();
}

}

}
Original file line number Diff line number Diff line change
Expand Up @@ -44,27 +44,40 @@ void testRetrievingResumeToken() {
}

@Test
void testTimestampExtraction() throws IOException {
void testTimestampExtractionFromEvent() throws IOException {
final int timestampSec = Long.valueOf(TimeUnit.MILLISECONDS.toSeconds(1692651270000L)).intValue();
final BsonTimestamp expectedTimestamp = new BsonTimestamp(timestampSec, 2);
final String changeEventJson = MoreResources.readResource("mongodb/change_event.json");
final JsonNode changeEvent = Jsons.deserialize(changeEventJson);
final BsonTimestamp timestamp = MongoDbResumeTokenHelper.extractTimestampFromEvent(changeEvent);
assertNotNull(timestamp);
assertEquals(expectedTimestamp, timestamp);
}

@Test
void testTimestampExtractionFromEventSource() throws IOException {
final int timestampSec = Long.valueOf(TimeUnit.MILLISECONDS.toSeconds(1692651270000L)).intValue();
final BsonTimestamp expectedTimestamp = new BsonTimestamp(timestampSec, 2);
final String changeEventJson = MoreResources.readResource("mongodb/change_event.json");
final JsonNode changeEvent = Jsons.deserialize(changeEventJson);

final BsonTimestamp timestamp = MongoDbResumeTokenHelper.extractTimestamp(changeEvent);
final BsonTimestamp timestamp = MongoDbResumeTokenHelper
.extractTimestampFromSource(changeEvent.get(MongoDbDebeziumConstants.ChangeEvent.SOURCE));
assertNotNull(timestamp);
assertEquals(expectedTimestamp, timestamp);
}

@Test
void testTimestampExtractionSourceNotPresent() {
void testTimestampExtractionFromEventSourceNotPresent() {
final JsonNode changeEvent = Jsons.deserialize("{}");
assertThrows(IllegalStateException.class, () -> MongoDbResumeTokenHelper.extractTimestamp(changeEvent));
assertThrows(IllegalStateException.class, () -> MongoDbResumeTokenHelper.extractTimestampFromEvent(changeEvent));
assertThrows(IllegalStateException.class, () -> MongoDbResumeTokenHelper.extractTimestampFromSource(changeEvent));
}

@Test
void testTimestampExtractionTimestampNotPresent() {
final JsonNode changeEvent = Jsons.deserialize("{\"source\":{}}");
assertThrows(IllegalStateException.class, () -> MongoDbResumeTokenHelper.extractTimestamp(changeEvent));
assertThrows(IllegalStateException.class, () -> MongoDbResumeTokenHelper.extractTimestampFromEvent(changeEvent));
}

}
Original file line number Diff line number Diff line change
Expand Up @@ -9,37 +9,30 @@
import static io.airbyte.integrations.source.mongodb.internal.MongoConstants.ID_FIELD;

import com.fasterxml.jackson.databind.JsonNode;
import com.google.common.annotations.VisibleForTesting;
import com.mongodb.client.MongoClient;
import com.mongodb.client.MongoDatabase;
import com.mongodb.client.model.Filters;
import com.mongodb.client.model.Projections;
import com.mongodb.client.model.Sorts;
import com.mongodb.connection.ClusterType;
import io.airbyte.commons.json.Jsons;
import io.airbyte.commons.util.AutoCloseableIterator;
import io.airbyte.commons.util.AutoCloseableIterators;
import io.airbyte.integrations.BaseConnector;
import io.airbyte.integrations.base.AirbyteTraceMessageUtility;
import io.airbyte.integrations.base.IntegrationRunner;
import io.airbyte.integrations.base.Source;
import io.airbyte.integrations.source.mongodb.internal.state.MongoDbStateManager;
import io.airbyte.integrations.source.mongodb.internal.state.MongoDbStreamState;
import io.airbyte.protocol.models.v0.AirbyteCatalog;
import io.airbyte.protocol.models.v0.AirbyteConnectionStatus;
import io.airbyte.protocol.models.v0.AirbyteMessage;
import io.airbyte.protocol.models.v0.AirbyteStateMessage;
import io.airbyte.protocol.models.v0.AirbyteStateMessage.AirbyteStateType;
import io.airbyte.protocol.models.v0.AirbyteStream;
import io.airbyte.protocol.models.v0.CatalogHelpers;
import io.airbyte.protocol.models.v0.ConfiguredAirbyteCatalog;
import io.airbyte.protocol.models.v0.StreamDescriptor;
import io.airbyte.protocol.models.v0.SyncMode;
import java.time.Instant;
import java.util.Arrays;
import java.util.List;
import java.util.Map;
import java.util.Map.Entry;
import java.util.Optional;
import java.util.stream.Collectors;
import org.bson.BsonDocument;
import org.bson.conversions.Bson;
import org.bson.types.ObjectId;
Expand All @@ -50,9 +43,6 @@ public class MongoDbSource extends BaseConnector implements Source {

private static final Logger LOGGER = LoggerFactory.getLogger(MongoDbSource.class);

/** Helper class for holding a collection-name and stream state together */
private record CollectionNameState(Optional<String> name, Optional<MongodbStreamState> state) {}

public static void main(final String[] args) throws Exception {
final Source source = new MongoDbSource();
LOGGER.info("starting source: {}", MongoDbSource.class);
Expand Down Expand Up @@ -106,15 +96,14 @@ public AutoCloseableIterator<AirbyteMessage> read(final JsonNode config,
final JsonNode state) {
final var databaseName = config.get(DATABASE_CONFIGURATION_KEY).asText();
final var emittedAt = Instant.now();

final var states = convertState(state);
final var stateManager = MongoDbStateManager.createStateManager(state);
final MongoClient mongoClient = MongoConnectionUtils.createMongoClient(config);

try {
final var database = mongoClient.getDatabase(databaseName);
// TODO treat INCREMENTAL and FULL_REFRESH differently?
return AutoCloseableIterators.appendOnClose(AutoCloseableIterators.concatWithEagerClose(
convertCatalogToIterators(catalog, states, database, emittedAt),
convertCatalogToIterators(catalog, stateManager, database, emittedAt),
AirbyteTraceMessageUtility::emitStreamStatusTrace),
mongoClient::close);
} catch (final Exception e) {
Expand All @@ -123,41 +112,12 @@ public AutoCloseableIterator<AirbyteMessage> read(final JsonNode config,
}
}

/**
* Converts the JsonNode into a map of mongodb collection names to stream states.
*/
@VisibleForTesting
protected Map<String, MongodbStreamState> convertState(final JsonNode state) {
// I'm unsure if the JsonNode data is going to be a singular AirbyteStateMessage or an array of
// AirbyteStateMessages.
// So this currently handles both cases, converting the singular message into a list of messages,
// leaving the list of messages
// as a list of messages, or returning an empty list.
final List<AirbyteStateMessage> states = Jsons.tryObject(state, AirbyteStateMessage.class)
.map(List::of)
.orElseGet(() -> Jsons.tryObject(state, AirbyteStateMessage[].class)
.map(Arrays::asList)
.orElse(List.of()));

// TODO add namespace support?
return states.stream()
.filter(s -> s.getType() == AirbyteStateType.STREAM)
.map(s -> new CollectionNameState(
Optional.ofNullable(s.getStream().getStreamDescriptor()).map(StreamDescriptor::getName),
Jsons.tryObject(s.getStream().getStreamState(), MongodbStreamState.class)))
// only keep states that could be parsed
.filter(p -> p.name.isPresent() && p.state.isPresent())
.collect(Collectors.toMap(
p -> p.name.orElseThrow(),
p -> p.state.orElseThrow()));
}

/**
* Converts the streams in the catalog into a list of AutoCloseableIterators.
*/
private List<AutoCloseableIterator<AirbyteMessage>> convertCatalogToIterators(
final ConfiguredAirbyteCatalog catalog,
final Map<String, MongodbStreamState> states,
final MongoDbStateManager stateManager,
final MongoDatabase database,
final Instant emittedAt) {
return catalog.getStreams()
Expand All @@ -175,12 +135,8 @@ private List<AutoCloseableIterator<AirbyteMessage>> convertCatalogToIterators(
final var fields = Projections.fields(Projections.include(CatalogHelpers.getTopLevelFieldNames(airbyteStream).stream().toList()));

// find the existing state, if there is one, for this steam
final Optional<MongodbStreamState> existingState = states.entrySet().stream()
// look only for states that match this stream's name
// TODO add namespace support
.filter(state -> state.getKey().equals(airbyteStream.getStream().getName()))
.map(Entry::getValue)
.findFirst();
final Optional<MongoDbStreamState> existingState =
stateManager.getStreamState(airbyteStream.getStream().getName(), airbyteStream.getStream().getNamespace());

// The filter determines the starting point of this iterator based on the state of this collection.
// If a state exists, it will use that state to create a query akin to
Expand All @@ -198,7 +154,7 @@ private List<AutoCloseableIterator<AirbyteMessage>> convertCatalogToIterators(
.sort(Sorts.ascending(ID_FIELD))
.cursor();

final var stateIterator = new MongoDbStateIterator(cursor, airbyteStream, existingState, emittedAt, CHECKPOINT_INTERVAL);
final var stateIterator = new MongoDbStateIterator(cursor, stateManager, airbyteStream, emittedAt, CHECKPOINT_INTERVAL);
return AutoCloseableIterators.fromIterator(stateIterator, cursor::close, null);
})
.toList();
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -6,21 +6,17 @@

import com.mongodb.MongoException;
import com.mongodb.client.MongoCursor;
import io.airbyte.commons.json.Jsons;
import io.airbyte.db.mongodb.MongoUtils;
import io.airbyte.integrations.source.mongodb.internal.state.MongoDbStateManager;
import io.airbyte.integrations.source.mongodb.internal.state.MongoDbStreamState;
import io.airbyte.protocol.models.v0.AirbyteMessage;
import io.airbyte.protocol.models.v0.AirbyteMessage.Type;
import io.airbyte.protocol.models.v0.AirbyteRecordMessage;
import io.airbyte.protocol.models.v0.AirbyteStateMessage;
import io.airbyte.protocol.models.v0.AirbyteStateMessage.AirbyteStateType;
import io.airbyte.protocol.models.v0.AirbyteStreamState;
import io.airbyte.protocol.models.v0.CatalogHelpers;
import io.airbyte.protocol.models.v0.ConfiguredAirbyteStream;
import io.airbyte.protocol.models.v0.StreamDescriptor;
import java.time.Instant;
import java.util.Iterator;
import java.util.List;
import java.util.Optional;
import org.bson.Document;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
Expand All @@ -36,6 +32,7 @@ class MongoDbStateIterator implements Iterator<AirbyteMessage> {
private static final Logger LOGGER = LoggerFactory.getLogger(MongoDbStateIterator.class);

private final MongoCursor<Document> iter;
private final MongoDbStateManager stateManager;
private final ConfiguredAirbyteStream stream;
private final List<String> fields;
private final Instant emittedAt;
Expand All @@ -60,23 +57,24 @@ class MongoDbStateIterator implements Iterator<AirbyteMessage> {
/**
* Constructor.
*
* @param iter MongoCursor that iterates over Mongo documents
* @param iter {@link MongoCursor} that iterates over Mongo documents
* @param stateManager {@link MongoDbStateManager} that manages global and per-stream state
* @param stream the stream that this iterator represents
* @param state the initial state of this stream
* @param emittedAt when this iterator was started
* @param checkpointInterval how often a state message should be emitted.
*/
MongoDbStateIterator(final MongoCursor<Document> iter,
final MongoDbStateManager stateManager,
final ConfiguredAirbyteStream stream,
Optional<MongodbStreamState> state,
final Instant emittedAt,
final int checkpointInterval) {
this.iter = iter;
this.stateManager = stateManager;
this.stream = stream;
this.checkpointInterval = checkpointInterval;
this.emittedAt = emittedAt;
fields = CatalogHelpers.getTopLevelFieldNames(stream).stream().toList();
lastId = state.map(MongodbStreamState::id).orElse(null);
lastId = stateManager.getStreamState(stream.getStream().getName(), stream.getStream().getNamespace()).map(MongoDbStreamState::id).orElse(null);
}

@Override
Expand All @@ -103,20 +101,15 @@ public AirbyteMessage next() {
if ((count > 0 && count % checkpointInterval == 0) || finalStateNext) {
count = 0;

final var streamState = new AirbyteStreamState()
.withStreamDescriptor(new StreamDescriptor()
.withName(stream.getStream().getName())
.withNamespace(stream.getStream().getNamespace()));
if (lastId != null) {
// TODO add type support in here once more than ObjectId fields are supported
streamState.withStreamState(Jsons.jsonNode(new MongodbStreamState(lastId)));
stateManager.updateStreamState(stream.getStream().getName(),
stream.getStream().getNamespace(), new MongoDbStreamState(lastId));
}

final var stateMessage = new AirbyteStateMessage()
.withType(AirbyteStateType.STREAM)
.withStream(streamState);

return new AirbyteMessage().withType(Type.STATE).withState(stateMessage);
return new AirbyteMessage()
.withType(Type.STATE)
.withState(stateManager.toState());
}

count++;
Expand Down
Loading