diff --git a/airbyte-integrations/bases/debezium/src/test/java/io/airbyte/integrations/debezium/internals/mongodb/MongoDbResumeTokenHelperTest.java b/airbyte-integrations/bases/debezium/src/test/java/io/airbyte/integrations/debezium/internals/mongodb/MongoDbResumeTokenHelperTest.java index 9198247817c3e..e48ce614da490 100644 --- a/airbyte-integrations/bases/debezium/src/test/java/io/airbyte/integrations/debezium/internals/mongodb/MongoDbResumeTokenHelperTest.java +++ b/airbyte-integrations/bases/debezium/src/test/java/io/airbyte/integrations/debezium/internals/mongodb/MongoDbResumeTokenHelperTest.java @@ -49,7 +49,6 @@ void testTimestampExtractionFromEvent() throws IOException { final BsonTimestamp expectedTimestamp = new BsonTimestamp(timestampSec, 2); final String changeEventJson = MoreResources.readResource("mongodb/change_event.json"); final JsonNode changeEvent = Jsons.deserialize(changeEventJson); - final BsonTimestamp timestamp = MongoDbResumeTokenHelper.extractTimestampFromEvent(changeEvent); assertNotNull(timestamp); assertEquals(expectedTimestamp, timestamp); diff --git a/airbyte-integrations/connectors/source-mongodb-internal-poc/src/main/java/io/airbyte/integrations/source/mongodb/internal/MongoDbSource.java b/airbyte-integrations/connectors/source-mongodb-internal-poc/src/main/java/io/airbyte/integrations/source/mongodb/internal/MongoDbSource.java index 47f371d539c8a..81849a07a1863 100644 --- a/airbyte-integrations/connectors/source-mongodb-internal-poc/src/main/java/io/airbyte/integrations/source/mongodb/internal/MongoDbSource.java +++ b/airbyte-integrations/connectors/source-mongodb-internal-poc/src/main/java/io/airbyte/integrations/source/mongodb/internal/MongoDbSource.java @@ -9,37 +9,30 @@ import static io.airbyte.integrations.source.mongodb.internal.MongoConstants.ID_FIELD; import com.fasterxml.jackson.databind.JsonNode; -import com.google.common.annotations.VisibleForTesting; import com.mongodb.client.MongoClient; import com.mongodb.client.MongoDatabase; import com.mongodb.client.model.Filters; import com.mongodb.client.model.Projections; import com.mongodb.client.model.Sorts; import com.mongodb.connection.ClusterType; -import io.airbyte.commons.json.Jsons; import io.airbyte.commons.util.AutoCloseableIterator; import io.airbyte.commons.util.AutoCloseableIterators; import io.airbyte.integrations.BaseConnector; import io.airbyte.integrations.base.AirbyteTraceMessageUtility; import io.airbyte.integrations.base.IntegrationRunner; import io.airbyte.integrations.base.Source; +import io.airbyte.integrations.source.mongodb.internal.state.MongoDbStateManager; +import io.airbyte.integrations.source.mongodb.internal.state.MongoDbStreamState; import io.airbyte.protocol.models.v0.AirbyteCatalog; import io.airbyte.protocol.models.v0.AirbyteConnectionStatus; import io.airbyte.protocol.models.v0.AirbyteMessage; -import io.airbyte.protocol.models.v0.AirbyteStateMessage; -import io.airbyte.protocol.models.v0.AirbyteStateMessage.AirbyteStateType; import io.airbyte.protocol.models.v0.AirbyteStream; import io.airbyte.protocol.models.v0.CatalogHelpers; import io.airbyte.protocol.models.v0.ConfiguredAirbyteCatalog; -import io.airbyte.protocol.models.v0.StreamDescriptor; import io.airbyte.protocol.models.v0.SyncMode; import java.time.Instant; -import java.util.Arrays; import java.util.List; -import java.util.Map; -import java.util.Map.Entry; import java.util.Optional; -import java.util.stream.Collectors; import org.bson.BsonDocument; import org.bson.conversions.Bson; import org.bson.types.ObjectId; @@ -50,9 +43,6 @@ public class MongoDbSource extends BaseConnector implements Source { private static final Logger LOGGER = LoggerFactory.getLogger(MongoDbSource.class); - /** Helper class for holding a collection-name and stream state together */ - private record CollectionNameState(Optional name, Optional state) {} - public static void main(final String[] args) throws Exception { final Source source = new MongoDbSource(); LOGGER.info("starting source: {}", MongoDbSource.class); @@ -106,15 +96,14 @@ public AutoCloseableIterator read(final JsonNode config, final JsonNode state) { final var databaseName = config.get(DATABASE_CONFIGURATION_KEY).asText(); final var emittedAt = Instant.now(); - - final var states = convertState(state); + final var stateManager = MongoDbStateManager.createStateManager(state); final MongoClient mongoClient = MongoConnectionUtils.createMongoClient(config); try { final var database = mongoClient.getDatabase(databaseName); // TODO treat INCREMENTAL and FULL_REFRESH differently? return AutoCloseableIterators.appendOnClose(AutoCloseableIterators.concatWithEagerClose( - convertCatalogToIterators(catalog, states, database, emittedAt), + convertCatalogToIterators(catalog, stateManager, database, emittedAt), AirbyteTraceMessageUtility::emitStreamStatusTrace), mongoClient::close); } catch (final Exception e) { @@ -123,41 +112,12 @@ public AutoCloseableIterator read(final JsonNode config, } } - /** - * Converts the JsonNode into a map of mongodb collection names to stream states. - */ - @VisibleForTesting - protected Map convertState(final JsonNode state) { - // I'm unsure if the JsonNode data is going to be a singular AirbyteStateMessage or an array of - // AirbyteStateMessages. - // So this currently handles both cases, converting the singular message into a list of messages, - // leaving the list of messages - // as a list of messages, or returning an empty list. - final List states = Jsons.tryObject(state, AirbyteStateMessage.class) - .map(List::of) - .orElseGet(() -> Jsons.tryObject(state, AirbyteStateMessage[].class) - .map(Arrays::asList) - .orElse(List.of())); - - // TODO add namespace support? - return states.stream() - .filter(s -> s.getType() == AirbyteStateType.STREAM) - .map(s -> new CollectionNameState( - Optional.ofNullable(s.getStream().getStreamDescriptor()).map(StreamDescriptor::getName), - Jsons.tryObject(s.getStream().getStreamState(), MongodbStreamState.class))) - // only keep states that could be parsed - .filter(p -> p.name.isPresent() && p.state.isPresent()) - .collect(Collectors.toMap( - p -> p.name.orElseThrow(), - p -> p.state.orElseThrow())); - } - /** * Converts the streams in the catalog into a list of AutoCloseableIterators. */ private List> convertCatalogToIterators( final ConfiguredAirbyteCatalog catalog, - final Map states, + final MongoDbStateManager stateManager, final MongoDatabase database, final Instant emittedAt) { return catalog.getStreams() @@ -175,12 +135,8 @@ private List> convertCatalogToIterators( final var fields = Projections.fields(Projections.include(CatalogHelpers.getTopLevelFieldNames(airbyteStream).stream().toList())); // find the existing state, if there is one, for this steam - final Optional existingState = states.entrySet().stream() - // look only for states that match this stream's name - // TODO add namespace support - .filter(state -> state.getKey().equals(airbyteStream.getStream().getName())) - .map(Entry::getValue) - .findFirst(); + final Optional existingState = + stateManager.getStreamState(airbyteStream.getStream().getName(), airbyteStream.getStream().getNamespace()); // The filter determines the starting point of this iterator based on the state of this collection. // If a state exists, it will use that state to create a query akin to @@ -198,7 +154,7 @@ private List> convertCatalogToIterators( .sort(Sorts.ascending(ID_FIELD)) .cursor(); - final var stateIterator = new MongoDbStateIterator(cursor, airbyteStream, existingState, emittedAt, CHECKPOINT_INTERVAL); + final var stateIterator = new MongoDbStateIterator(cursor, stateManager, airbyteStream, emittedAt, CHECKPOINT_INTERVAL); return AutoCloseableIterators.fromIterator(stateIterator, cursor::close, null); }) .toList(); diff --git a/airbyte-integrations/connectors/source-mongodb-internal-poc/src/main/java/io/airbyte/integrations/source/mongodb/internal/MongoDbStateIterator.java b/airbyte-integrations/connectors/source-mongodb-internal-poc/src/main/java/io/airbyte/integrations/source/mongodb/internal/MongoDbStateIterator.java index 4dce1c39aadf7..63ba46a976ad0 100644 --- a/airbyte-integrations/connectors/source-mongodb-internal-poc/src/main/java/io/airbyte/integrations/source/mongodb/internal/MongoDbStateIterator.java +++ b/airbyte-integrations/connectors/source-mongodb-internal-poc/src/main/java/io/airbyte/integrations/source/mongodb/internal/MongoDbStateIterator.java @@ -6,21 +6,17 @@ import com.mongodb.MongoException; import com.mongodb.client.MongoCursor; -import io.airbyte.commons.json.Jsons; import io.airbyte.db.mongodb.MongoUtils; +import io.airbyte.integrations.source.mongodb.internal.state.MongoDbStateManager; +import io.airbyte.integrations.source.mongodb.internal.state.MongoDbStreamState; import io.airbyte.protocol.models.v0.AirbyteMessage; import io.airbyte.protocol.models.v0.AirbyteMessage.Type; import io.airbyte.protocol.models.v0.AirbyteRecordMessage; -import io.airbyte.protocol.models.v0.AirbyteStateMessage; -import io.airbyte.protocol.models.v0.AirbyteStateMessage.AirbyteStateType; -import io.airbyte.protocol.models.v0.AirbyteStreamState; import io.airbyte.protocol.models.v0.CatalogHelpers; import io.airbyte.protocol.models.v0.ConfiguredAirbyteStream; -import io.airbyte.protocol.models.v0.StreamDescriptor; import java.time.Instant; import java.util.Iterator; import java.util.List; -import java.util.Optional; import org.bson.Document; import org.slf4j.Logger; import org.slf4j.LoggerFactory; @@ -36,6 +32,7 @@ class MongoDbStateIterator implements Iterator { private static final Logger LOGGER = LoggerFactory.getLogger(MongoDbStateIterator.class); private final MongoCursor iter; + private final MongoDbStateManager stateManager; private final ConfiguredAirbyteStream stream; private final List fields; private final Instant emittedAt; @@ -60,23 +57,25 @@ class MongoDbStateIterator implements Iterator { /** * Constructor. * - * @param iter MongoCursor that iterates over Mongo documents + * @param iter {@link MongoCursor} that iterates over Mongo documents + * @param stateManager {@link MongoDbStateManager} that manages global and per-stream state * @param stream the stream that this iterator represents - * @param state the initial state of this stream * @param emittedAt when this iterator was started * @param checkpointInterval how often a state message should be emitted. */ MongoDbStateIterator(final MongoCursor iter, + final MongoDbStateManager stateManager, final ConfiguredAirbyteStream stream, - Optional state, final Instant emittedAt, final int checkpointInterval) { this.iter = iter; + this.stateManager = stateManager; this.stream = stream; this.checkpointInterval = checkpointInterval; this.emittedAt = emittedAt; - fields = CatalogHelpers.getTopLevelFieldNames(stream).stream().toList(); - lastId = state.map(MongodbStreamState::id).orElse(null); + this.fields = CatalogHelpers.getTopLevelFieldNames(stream).stream().toList(); + this.lastId = + stateManager.getStreamState(stream.getStream().getName(), stream.getStream().getNamespace()).map(MongoDbStreamState::id).orElse(null); } @Override @@ -103,20 +102,15 @@ public AirbyteMessage next() { if ((count > 0 && count % checkpointInterval == 0) || finalStateNext) { count = 0; - final var streamState = new AirbyteStreamState() - .withStreamDescriptor(new StreamDescriptor() - .withName(stream.getStream().getName()) - .withNamespace(stream.getStream().getNamespace())); if (lastId != null) { // TODO add type support in here once more than ObjectId fields are supported - streamState.withStreamState(Jsons.jsonNode(new MongodbStreamState(lastId))); + stateManager.updateStreamState(stream.getStream().getName(), + stream.getStream().getNamespace(), new MongoDbStreamState(lastId)); } - final var stateMessage = new AirbyteStateMessage() - .withType(AirbyteStateType.STREAM) - .withStream(streamState); - - return new AirbyteMessage().withType(Type.STATE).withState(stateMessage); + return new AirbyteMessage() + .withType(Type.STATE) + .withState(stateManager.toState()); } count++; diff --git a/airbyte-integrations/connectors/source-mongodb-internal-poc/src/main/java/io/airbyte/integrations/source/mongodb/internal/cdc/MongoDbCdcState.java b/airbyte-integrations/connectors/source-mongodb-internal-poc/src/main/java/io/airbyte/integrations/source/mongodb/internal/cdc/MongoDbCdcState.java new file mode 100644 index 0000000000000..3653eae106cf5 --- /dev/null +++ b/airbyte-integrations/connectors/source-mongodb-internal-poc/src/main/java/io/airbyte/integrations/source/mongodb/internal/cdc/MongoDbCdcState.java @@ -0,0 +1,14 @@ +/* + * Copyright (c) 2023 Airbyte, Inc., all rights reserved. + */ + +package io.airbyte.integrations.source.mongodb.internal.cdc; + +import com.fasterxml.jackson.databind.JsonNode; + +/** + * Represents the global CDC state that is used by Debezium as an offset. + * + * @param state The Debezium offset state as a {@link JsonNode}. + */ +public record MongoDbCdcState(JsonNode state) {} diff --git a/airbyte-integrations/connectors/source-mongodb-internal-poc/src/main/java/io/airbyte/integrations/source/mongodb/internal/cdc/MongoDbCdcStateHandler.java b/airbyte-integrations/connectors/source-mongodb-internal-poc/src/main/java/io/airbyte/integrations/source/mongodb/internal/cdc/MongoDbCdcStateHandler.java new file mode 100644 index 0000000000000..de0d62a118793 --- /dev/null +++ b/airbyte-integrations/connectors/source-mongodb-internal-poc/src/main/java/io/airbyte/integrations/source/mongodb/internal/cdc/MongoDbCdcStateHandler.java @@ -0,0 +1,51 @@ +/* + * Copyright (c) 2023 Airbyte, Inc., all rights reserved. + */ + +package io.airbyte.integrations.source.mongodb.internal.cdc; + +import io.airbyte.commons.json.Jsons; +import io.airbyte.integrations.debezium.CdcStateHandler; +import io.airbyte.integrations.source.mongodb.internal.state.MongoDbStateManager; +import io.airbyte.protocol.models.v0.AirbyteMessage; +import io.airbyte.protocol.models.v0.AirbyteStateMessage; +import java.util.Map; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +/** + * Implementation of the {@link CdcStateHandler} that handles saving the CDC offset as Airbyte state + * for MongoDB. + */ +public class MongoDbCdcStateHandler implements CdcStateHandler { + + private static final Logger LOGGER = LoggerFactory.getLogger(MongoDbCdcStateHandler.class); + + private final MongoDbStateManager stateManager; + + public MongoDbCdcStateHandler(MongoDbStateManager stateManager) { + this.stateManager = stateManager; + } + + @Override + public AirbyteMessage saveState(final Map offset, String dbHistory) { + final MongoDbCdcState cdcState = new MongoDbCdcState(Jsons.jsonNode(offset)); + + LOGGER.info("Saving Debezium state {}...", cdcState); + stateManager.updateCdcState(cdcState); + + final AirbyteStateMessage stateMessage = stateManager.toState(); + return new AirbyteMessage().withType(AirbyteMessage.Type.STATE).withState(stateMessage); + } + + @Override + public AirbyteMessage saveStateAfterCompletionOfSnapshotOfNewStreams() { + throw new RuntimeException("Debezium is not used to carry out the snapshot of tables."); + } + + @Override + public boolean isCdcCheckpointEnabled() { + return false; + } + +} diff --git a/airbyte-integrations/connectors/source-mongodb-internal-poc/src/main/java/io/airbyte/integrations/source/mongodb/internal/state/MongoDbStateManager.java b/airbyte-integrations/connectors/source-mongodb-internal-poc/src/main/java/io/airbyte/integrations/source/mongodb/internal/state/MongoDbStateManager.java new file mode 100644 index 0000000000000..fec09b4e022e9 --- /dev/null +++ b/airbyte-integrations/connectors/source-mongodb-internal-poc/src/main/java/io/airbyte/integrations/source/mongodb/internal/state/MongoDbStateManager.java @@ -0,0 +1,188 @@ +/* + * Copyright (c) 2023 Airbyte, Inc., all rights reserved. + */ + +package io.airbyte.integrations.source.mongodb.internal.state; + +import com.fasterxml.jackson.core.type.TypeReference; +import com.fasterxml.jackson.databind.JsonNode; +import io.airbyte.commons.json.Jsons; +import io.airbyte.integrations.source.mongodb.internal.cdc.MongoDbCdcState; +import io.airbyte.protocol.models.v0.AirbyteGlobalState; +import io.airbyte.protocol.models.v0.AirbyteStateMessage; +import io.airbyte.protocol.models.v0.AirbyteStreamNameNamespacePair; +import io.airbyte.protocol.models.v0.AirbyteStreamState; +import io.airbyte.protocol.models.v0.StreamDescriptor; +import java.util.HashMap; +import java.util.List; +import java.util.Map; +import java.util.Optional; +import java.util.stream.Collectors; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +/** + * A state manager for MongoDB CDC syncs. + */ +public class MongoDbStateManager { + + private static final Logger LOGGER = LoggerFactory.getLogger(MongoDbStateManager.class); + + /** + * The global CDC state. + */ + private MongoDbCdcState cdcState; + + /** + * Map of streams (name/namespace tuple) to the current stream state information stored in the + * state. + */ + private final Map pairToStreamState = new HashMap<>(); + + /** + * Creates a new {@link MongoDbStateManager} primed with the provided initial state. + * + * @param initialState The initial state to be stored in the state manager. + * @return A new {@link MongoDbStateManager} + */ + public static MongoDbStateManager createStateManager(final JsonNode initialState) { + final MongoDbStateManager stateManager = new MongoDbStateManager(); + + if (initialState != null) { + LOGGER.info("Initial state {}", initialState); + final List stateMessages = deserializeState(initialState); + if (!stateMessages.isEmpty()) { + if (stateMessages.size() == 1) { + final AirbyteStateMessage stateMessage = stateMessages.get(0); + stateManager.updateCdcState(Jsons.object(stateMessage.getGlobal().getSharedState(), MongoDbCdcState.class)); + stateMessage.getGlobal().getStreamStates().forEach(s -> { + stateManager.updateStreamState(s.getStreamDescriptor().getName(), s.getStreamDescriptor().getNamespace(), + Jsons.object(s.getStreamState(), MongoDbStreamState.class)); + }); + } else { + throw new IllegalStateException("The state contains multiple message, but only 1 is expected."); + } + } + } + return stateManager; + } + + private static List deserializeState(final JsonNode initialState) { + try { + return Jsons.object(initialState, new TypeReference<>() {}); + } catch (final IllegalArgumentException e) { + LOGGER.debug("Failed to deserialize initial state {}.", initialState, e); + return List.of(); + } + } + + /** + * Creates a new {@link MongoDbStateManager} instance. This constructor should not be called + * directly. Instead, use {@link #createStateManager(JsonNode)}. + */ + private MongoDbStateManager() {} + + /** + * Returns the global, CDC state stored by the manager. + * + * @return A {@link MongoDbCdcState} instance that represents the global state. + */ + public MongoDbCdcState getCdcState() { + return cdcState; + } + + /** + * Returns the current stream state for the given stream. + * + * @param streamName The name of the stream. + * @param streamNamespace The namespace of the stream. + * @return The {@link MongoDbStreamState} associated with the stream or an empty {@link Optional} if + * the stream is not currently tracked by the manager. + */ + public Optional getStreamState(final String streamName, final String streamNamespace) { + final AirbyteStreamNameNamespacePair airbyteStreamNameNamespacePair = new AirbyteStreamNameNamespacePair(streamName, streamNamespace); + return Optional.ofNullable(pairToStreamState.get(airbyteStreamNameNamespacePair)); + } + + /** + * Updates the global, CDC state tracked by the manager. + * + * @param cdcState The new global, CDC state as an {@link MongoDbCdcState} instance. + */ + public void updateCdcState(final MongoDbCdcState cdcState) { + this.cdcState = cdcState; + } + + /** + * Updates the state associated with a stream. + * + * @param streamName The name of the stream. + * @param streamNamespace The namespace of the stream. + * @param streamState The new stream state. + */ + public void updateStreamState(final String streamName, final String streamNamespace, final MongoDbStreamState streamState) { + final AirbyteStreamNameNamespacePair airbyteStreamNameNamespacePair = new AirbyteStreamNameNamespacePair(streamName, streamNamespace); + pairToStreamState.put(airbyteStreamNameNamespacePair, streamState); + } + + /** + * Generates an {@link AirbyteStateMessage} from the state tracked by this manager. The resulting + * state message contains a global state object with the CDC state as the "shared state" and the + * individual stream states as the "stream states". + * + * @return An {@link AirbyteStateMessage} that represents the state stored by the manager. + */ + public AirbyteStateMessage toState() { + // Populate global state + final AirbyteGlobalState globalState = new AirbyteGlobalState(); + // TODO For now, handle the null cdc state case. Once integrated with Debezium, we should + // never hit this scenario, as we should set the cdc state to the initial offset retrieved at start + // of the sync. + final MongoDbCdcState cdcState = getCdcState(); + globalState.setSharedState(cdcState != null ? Jsons.jsonNode(cdcState) : Jsons.emptyObject()); + globalState.setStreamStates(generateStreamStateList(pairToStreamState)); + + return new AirbyteStateMessage() + .withType(AirbyteStateMessage.AirbyteStateType.GLOBAL) + .withGlobal(globalState); + } + + private List generateStreamStateList(final Map pairToCursorInfoMap) { + return pairToCursorInfoMap.entrySet().stream() + .sorted(Map.Entry.comparingByKey()) + .map(e -> generateStreamState(e.getKey(), e.getValue())) + .filter(s -> isValidStreamDescriptor(s.getStreamDescriptor())) + .collect(Collectors.toList()); + } + + private AirbyteStreamState generateStreamState(final AirbyteStreamNameNamespacePair airbyteStreamNameNamespacePair, + final MongoDbStreamState streamState) { + return new AirbyteStreamState() + .withStreamDescriptor( + new StreamDescriptor() + .withName(airbyteStreamNameNamespacePair.getName()) + .withNamespace(airbyteStreamNameNamespacePair.getNamespace())) + .withStreamState(Jsons.jsonNode(streamState)); + } + + /** + * Tests whether the provided {@link StreamDescriptor} is valid. A valid descriptor is defined as + * one that has a non-{@code null} name. + * + * See + * https://github.com/airbytehq/airbyte/blob/e63458fabb067978beb5eaa74d2bc130919b419f/docs/understanding-airbyte/airbyte-protocol.md + * for more details + * + * @param streamDescriptor A {@link StreamDescriptor} to be validated. + * @return {@code true} if the provided {@link StreamDescriptor} is valid or {@code false} if it is + * invalid. + */ + private boolean isValidStreamDescriptor(final StreamDescriptor streamDescriptor) { + if (streamDescriptor != null) { + return streamDescriptor.getName() != null; + } else { + return false; + } + } + +} diff --git a/airbyte-integrations/connectors/source-mongodb-internal-poc/src/main/java/io/airbyte/integrations/source/mongodb/internal/MongodbStreamState.java b/airbyte-integrations/connectors/source-mongodb-internal-poc/src/main/java/io/airbyte/integrations/source/mongodb/internal/state/MongoDbStreamState.java similarity index 54% rename from airbyte-integrations/connectors/source-mongodb-internal-poc/src/main/java/io/airbyte/integrations/source/mongodb/internal/MongodbStreamState.java rename to airbyte-integrations/connectors/source-mongodb-internal-poc/src/main/java/io/airbyte/integrations/source/mongodb/internal/state/MongoDbStreamState.java index 12b8dad1f1519..7977625832319 100644 --- a/airbyte-integrations/connectors/source-mongodb-internal-poc/src/main/java/io/airbyte/integrations/source/mongodb/internal/MongodbStreamState.java +++ b/airbyte-integrations/connectors/source-mongodb-internal-poc/src/main/java/io/airbyte/integrations/source/mongodb/internal/state/MongoDbStreamState.java @@ -2,11 +2,11 @@ * Copyright (c) 2023 Airbyte, Inc., all rights reserved. */ -package io.airbyte.integrations.source.mongodb.internal; +package io.airbyte.integrations.source.mongodb.internal.state; /* * TODO replace `isObjectId` with _id enum (ObjectId, String, etc.) */ -public record MongodbStreamState(String id) { // , boolean isObjectId) { +public record MongoDbStreamState(String id) { // , boolean isObjectId) { } diff --git a/airbyte-integrations/connectors/source-mongodb-internal-poc/src/test/java/io/airbyte/integrations/source/mongodb/internal/MongoDbSourceTest.java b/airbyte-integrations/connectors/source-mongodb-internal-poc/src/test/java/io/airbyte/integrations/source/mongodb/internal/MongoDbSourceTest.java index c4a02f426d837..3f2532337b37e 100644 --- a/airbyte-integrations/connectors/source-mongodb-internal-poc/src/test/java/io/airbyte/integrations/source/mongodb/internal/MongoDbSourceTest.java +++ b/airbyte-integrations/connectors/source-mongodb-internal-poc/src/test/java/io/airbyte/integrations/source/mongodb/internal/MongoDbSourceTest.java @@ -191,16 +191,6 @@ void testIncrementalRefresh() throws Exception { // TODO implement } - @Test - void testConvertState() { - final var state1 = Jsons.deserialize( - "[{\"type\":\"STREAM\",\"stream\":{\"stream_descriptor\":{\"name\":\"test.acceptance_test1\"},\"stream_state\":{\"id\":\"64c0029d95ad260d69ef28a2\"}}}]"); - final var actual = source.convertState(state1); - assertTrue(actual.containsKey("test.acceptance_test1"), "missing test.acceptance_test1"); - assertEquals("64c0029d95ad260d69ef28a2", actual.get("test.acceptance_test1").id(), "id value does not match"); - - } - private static JsonNode createConfiguration(final Optional username, final Optional password) { final Map config = new HashMap<>(); final Map baseConfig = Map.of( diff --git a/airbyte-integrations/connectors/source-mongodb-internal-poc/src/test/java/io/airbyte/integrations/source/mongodb/internal/MongoDbStateIteratorTest.java b/airbyte-integrations/connectors/source-mongodb-internal-poc/src/test/java/io/airbyte/integrations/source/mongodb/internal/MongoDbStateIteratorTest.java index 040081f2b8b35..7b921a3095f80 100644 --- a/airbyte-integrations/connectors/source-mongodb-internal-poc/src/test/java/io/airbyte/integrations/source/mongodb/internal/MongoDbStateIteratorTest.java +++ b/airbyte-integrations/connectors/source-mongodb-internal-poc/src/test/java/io/airbyte/integrations/source/mongodb/internal/MongoDbStateIteratorTest.java @@ -9,6 +9,8 @@ import com.mongodb.MongoException; import com.mongodb.client.MongoCursor; +import io.airbyte.integrations.source.mongodb.internal.state.MongoDbStateManager; +import io.airbyte.integrations.source.mongodb.internal.state.MongoDbStreamState; import io.airbyte.protocol.models.Field; import io.airbyte.protocol.models.JsonSchemaType; import io.airbyte.protocol.models.v0.AirbyteMessage; @@ -20,7 +22,6 @@ import io.airbyte.protocol.models.v0.SyncMode; import java.time.Instant; import java.util.List; -import java.util.Optional; import org.bson.Document; import org.bson.types.ObjectId; import org.junit.jupiter.api.AfterEach; @@ -37,10 +38,12 @@ class MongoDbStateIteratorTest { @Mock private MongoCursor mongoCursor; private AutoCloseable closeable; + private MongoDbStateManager stateManager; @BeforeEach public void setup() { closeable = MockitoAnnotations.openMocks(this); + stateManager = MongoDbStateManager.createStateManager(null); } @AfterEach @@ -80,7 +83,7 @@ public Document answer(InvocationOnMock invocation) throws Throwable { final var stream = catalog().getStreams().stream().findFirst().orElseThrow(); - final var iter = new MongoDbStateIterator(mongoCursor, stream, Optional.empty(), Instant.now(), CHECKPOINT_INTERVAL); + final var iter = new MongoDbStateIterator(mongoCursor, stateManager, stream, Instant.now(), CHECKPOINT_INTERVAL); // with a batch size of 2, the MongoDbStateIterator should return the following after each // `hasNext`/`next` call: @@ -106,7 +109,7 @@ public Document answer(InvocationOnMock invocation) throws Throwable { assertEquals(Type.STATE, message.getType()); assertEquals( docs.get(1).get("_id").toString(), - message.getState().getStream().getStreamState().get("id").asText(), + message.getState().getGlobal().getStreamStates().get(0).getStreamState().get("id").asText(), "state id should match last record id"); assertTrue(iter.hasNext(), "alizarin crimson should be next"); @@ -119,7 +122,7 @@ public Document answer(InvocationOnMock invocation) throws Throwable { assertEquals(Type.STATE, message.getType()); assertEquals( docs.get(2).get("_id").toString(), - message.getState().getStream().getStreamState().get("id").asText(), + message.getState().getGlobal().getStreamStates().get(0).getStreamState().get("id").asText(), "state id should match last record id"); assertFalse(iter.hasNext(), "should have no more records"); @@ -138,7 +141,7 @@ void treatHasNextExceptionAsFalse() { final var stream = catalog().getStreams().stream().findFirst().orElseThrow(); - final var iter = new MongoDbStateIterator(mongoCursor, stream, Optional.empty(), Instant.now(), CHECKPOINT_INTERVAL); + final var iter = new MongoDbStateIterator(mongoCursor, stateManager, stream, Instant.now(), CHECKPOINT_INTERVAL); // with a batch size of 2, the MongoDbStateIterator should return the following after each // `hasNext`/`next` call: @@ -156,7 +159,7 @@ void treatHasNextExceptionAsFalse() { assertEquals(Type.STATE, message.getType()); assertEquals( docs.get(0).get("_id").toString(), - message.getState().getStream().getStreamState().get("id").asText(), + message.getState().getGlobal().getStreamStates().get(0).getStreamState().get("id").asText(), "state id should match last record id"); assertFalse(iter.hasNext(), "should have no more records"); @@ -171,7 +174,11 @@ void initialStateIsReturnedIfUnderlyingIteratorIsEmpty() { final var stream = catalog().getStreams().stream().findFirst().orElseThrow(); final var objectId = "64dfb6a7bb3c3458c30801f4"; - final var iter = new MongoDbStateIterator(mongoCursor, stream, Optional.of(new MongodbStreamState(objectId)), Instant.now(), CHECKPOINT_INTERVAL); + + stateManager.updateStreamState(stream.getStream().getName(), stream.getStream().getNamespace(), + new MongoDbStreamState(objectId)); + + final var iter = new MongoDbStateIterator(mongoCursor, stateManager, stream, Instant.now(), CHECKPOINT_INTERVAL); // the MongoDbStateIterator should return the following after each // `hasNext`/`next` call: @@ -183,7 +190,7 @@ void initialStateIsReturnedIfUnderlyingIteratorIsEmpty() { assertEquals(Type.STATE, message.getType()); assertEquals( objectId, - message.getState().getStream().getStreamState().get("id").asText(), + message.getState().getGlobal().getStreamStates().get(0).getStreamState().get("id").asText(), "state id should match initial state "); assertFalse(iter.hasNext(), "should have no more records"); diff --git a/airbyte-integrations/connectors/source-mongodb-internal-poc/src/test/java/io/airbyte/integrations/source/mongodb/internal/state/MongoDbCdcStateHandlerTest.java b/airbyte-integrations/connectors/source-mongodb-internal-poc/src/test/java/io/airbyte/integrations/source/mongodb/internal/state/MongoDbCdcStateHandlerTest.java new file mode 100644 index 0000000000000..2446ead6cf952 --- /dev/null +++ b/airbyte-integrations/connectors/source-mongodb-internal-poc/src/test/java/io/airbyte/integrations/source/mongodb/internal/state/MongoDbCdcStateHandlerTest.java @@ -0,0 +1,63 @@ +/* + * Copyright (c) 2023 Airbyte, Inc., all rights reserved. + */ + +package io.airbyte.integrations.source.mongodb.internal.state; + +import static io.airbyte.integrations.debezium.internals.mongodb.MongoDbDebeziumConstants.ChangeEvent.SOURCE_ORDER; +import static io.airbyte.integrations.debezium.internals.mongodb.MongoDbDebeziumConstants.ChangeEvent.SOURCE_RESUME_TOKEN; +import static io.airbyte.integrations.debezium.internals.mongodb.MongoDbDebeziumConstants.ChangeEvent.SOURCE_SECONDS; +import static io.airbyte.protocol.models.v0.AirbyteStateMessage.AirbyteStateType.GLOBAL; +import static org.junit.jupiter.api.Assertions.assertEquals; +import static org.junit.jupiter.api.Assertions.assertNotNull; + +import io.airbyte.integrations.source.mongodb.internal.cdc.MongoDbCdcState; +import io.airbyte.integrations.source.mongodb.internal.cdc.MongoDbCdcStateHandler; +import io.airbyte.protocol.models.Jsons; +import io.airbyte.protocol.models.v0.AirbyteMessage; +import java.util.Map; +import org.junit.jupiter.api.BeforeEach; +import org.junit.jupiter.api.Test; + +class MongoDbCdcStateHandlerTest { + + private static final String RESUME_TOKEN = "8264BEB9F3000000012B0229296E04"; + + private MongoDbStateManager mongoDbStateManager; + + private MongoDbCdcStateHandler mongoDbCdcStateHandler; + + @BeforeEach + void setup() { + mongoDbStateManager = MongoDbStateManager.createStateManager(null); + mongoDbCdcStateHandler = new MongoDbCdcStateHandler(mongoDbStateManager); + } + + @Test + void testSavingState() { + final int seconds = 1234567; + final int order = 1; + final Map offset = Map.of(SOURCE_SECONDS, String.valueOf(seconds), + SOURCE_ORDER, String.valueOf(order), + SOURCE_RESUME_TOKEN, RESUME_TOKEN); + + final AirbyteMessage airbyteMessage = mongoDbCdcStateHandler.saveState(offset, ""); + + assertNotNull(airbyteMessage); + assertEquals(AirbyteMessage.Type.STATE, airbyteMessage.getType()); + assertNotNull(airbyteMessage.getState()); + assertEquals(GLOBAL, airbyteMessage.getState().getType()); + assertEquals(new MongoDbCdcState(Jsons.jsonNode(offset)), + Jsons.object(airbyteMessage.getState().getGlobal().getSharedState(), MongoDbCdcState.class)); + } + + @Test + void testSaveStateAfterCompletionOfSnapshotOfNewStreams() { + final AirbyteMessage airbyteMessage = mongoDbCdcStateHandler.saveStateAfterCompletionOfSnapshotOfNewStreams(); + assertNotNull(airbyteMessage); + assertEquals(AirbyteMessage.Type.STATE, airbyteMessage.getType()); + assertNotNull(airbyteMessage.getState()); + assertEquals(GLOBAL, airbyteMessage.getState().getType()); + } + +} diff --git a/airbyte-integrations/connectors/source-mongodb-internal-poc/src/test/java/io/airbyte/integrations/source/mongodb/internal/state/MongoDbStateManagerTest.java b/airbyte-integrations/connectors/source-mongodb-internal-poc/src/test/java/io/airbyte/integrations/source/mongodb/internal/state/MongoDbStateManagerTest.java new file mode 100644 index 0000000000000..b8c365e9b38f3 --- /dev/null +++ b/airbyte-integrations/connectors/source-mongodb-internal-poc/src/test/java/io/airbyte/integrations/source/mongodb/internal/state/MongoDbStateManagerTest.java @@ -0,0 +1,146 @@ +/* + * Copyright (c) 2023 Airbyte, Inc., all rights reserved. + */ + +package io.airbyte.integrations.source.mongodb.internal.state; + +import static io.airbyte.integrations.debezium.internals.mongodb.MongoDbDebeziumConstants.ChangeEvent.SOURCE_ORDER; +import static io.airbyte.integrations.debezium.internals.mongodb.MongoDbDebeziumConstants.ChangeEvent.SOURCE_RESUME_TOKEN; +import static io.airbyte.integrations.debezium.internals.mongodb.MongoDbDebeziumConstants.ChangeEvent.SOURCE_SECONDS; +import static org.junit.jupiter.api.Assertions.assertEquals; +import static org.junit.jupiter.api.Assertions.assertNotNull; +import static org.junit.jupiter.api.Assertions.assertNull; +import static org.junit.jupiter.api.Assertions.assertThrows; +import static org.junit.jupiter.api.Assertions.assertTrue; + +import com.fasterxml.jackson.databind.JsonNode; +import io.airbyte.commons.json.Jsons; +import io.airbyte.integrations.source.mongodb.internal.cdc.MongoDbCdcState; +import io.airbyte.protocol.models.v0.AirbyteGlobalState; +import io.airbyte.protocol.models.v0.AirbyteStateMessage; +import io.airbyte.protocol.models.v0.AirbyteStreamState; +import io.airbyte.protocol.models.v0.StreamDescriptor; +import java.util.List; +import java.util.Map; +import org.junit.jupiter.api.Test; + +class MongoDbStateManagerTest { + + private static final String ID = "64c0029d95ad260d69ef28a0"; + private static final String RESUME_TOKEN = "8264BEB9F3000000012B0229296E04"; + private static final String STREAM_NAME = "test-collection"; + private static final String STREAM_NAMESPACE = "test-database"; + + @Test + void testCreationWithInitialState() { + final StreamDescriptor streamDescriptor = new StreamDescriptor().withNamespace(STREAM_NAMESPACE).withName(STREAM_NAME); + final int seconds = 123456789; + final int order = 1; + final Map offset = Map.of(SOURCE_SECONDS, String.valueOf(seconds), + SOURCE_ORDER, String.valueOf(order), + SOURCE_RESUME_TOKEN, RESUME_TOKEN); + final MongoDbCdcState cdcState = new MongoDbCdcState(Jsons.jsonNode(offset)); + final MongoDbStreamState mongoDbStreamState = new MongoDbStreamState(ID); + final JsonNode sharedState = Jsons.jsonNode(cdcState); + final JsonNode streamState = Jsons.jsonNode(mongoDbStreamState); + final AirbyteStreamState airbyteStreamState = new AirbyteStreamState().withStreamDescriptor(streamDescriptor).withStreamState(streamState); + final AirbyteGlobalState airbyteGlobalState = new AirbyteGlobalState().withSharedState(sharedState).withStreamStates(List.of(airbyteStreamState)); + final AirbyteStateMessage airbyteStateMessage = + new AirbyteStateMessage().withType(AirbyteStateMessage.AirbyteStateType.GLOBAL).withGlobal(airbyteGlobalState); + + final MongoDbStateManager stateManager = MongoDbStateManager.createStateManager(Jsons.jsonNode(List.of(airbyteStateMessage))); + assertNotNull(stateManager); + assertNotNull(stateManager.getCdcState()); + assertEquals(seconds, stateManager.getCdcState().state().get(SOURCE_SECONDS).asInt()); + assertEquals(order, stateManager.getCdcState().state().get(SOURCE_ORDER).asInt()); + assertEquals(RESUME_TOKEN, stateManager.getCdcState().state().get(SOURCE_RESUME_TOKEN).asText()); + assertTrue(stateManager.getStreamState(STREAM_NAME, STREAM_NAMESPACE).isPresent()); + assertEquals(ID, stateManager.getStreamState(STREAM_NAME, STREAM_NAMESPACE).get().id()); + } + + @Test + void testCreationWithInitialNullState() { + final MongoDbStateManager stateManager = MongoDbStateManager.createStateManager(null); + assertNotNull(stateManager); + assertNull(stateManager.getCdcState()); + } + + @Test + void testCreationWithInitialEmptyState() { + final MongoDbStateManager stateManager = MongoDbStateManager.createStateManager(Jsons.emptyObject()); + assertNotNull(stateManager); + assertNull(stateManager.getCdcState()); + } + + @Test + void testCreationWithInitialEmptyListState() { + final MongoDbStateManager stateManager = MongoDbStateManager.createStateManager(Jsons.jsonNode(List.of())); + assertNotNull(stateManager); + assertNull(stateManager.getCdcState()); + } + + @Test + void testCreationWithInitialStateTooManyMessages() { + final List stateMessages = List.of(new AirbyteStateMessage(), new AirbyteStateMessage()); + assertThrows(IllegalStateException.class, () -> MongoDbStateManager.createStateManager(Jsons.jsonNode(stateMessages))); + } + + @Test + void testUpdateCdcState() { + final MongoDbStateManager stateManager = MongoDbStateManager.createStateManager(null); + assertNotNull(stateManager); + assertNull(stateManager.getCdcState()); + + final Map offset = Map.of(SOURCE_SECONDS, String.valueOf(123456789), + SOURCE_ORDER, String.valueOf(1), + SOURCE_RESUME_TOKEN, RESUME_TOKEN); + final MongoDbCdcState cdcState = new MongoDbCdcState(Jsons.jsonNode(offset)); + stateManager.updateCdcState(cdcState); + assertNotNull(stateManager.getCdcState()); + assertEquals(cdcState, stateManager.getCdcState()); + } + + @Test + void testGeneratingAirbyteStateMessage() { + final StreamDescriptor streamDescriptor = new StreamDescriptor().withNamespace(STREAM_NAMESPACE).withName(STREAM_NAME); + final int seconds = 123456789; + final int order = 1; + final Map offset = Map.of(SOURCE_SECONDS, String.valueOf(seconds), + SOURCE_ORDER, String.valueOf(order), + SOURCE_RESUME_TOKEN, RESUME_TOKEN); + final MongoDbCdcState cdcState = new MongoDbCdcState(Jsons.jsonNode(offset)); + final MongoDbStreamState mongoDbStreamState = new MongoDbStreamState(ID); + final JsonNode sharedState = Jsons.jsonNode(cdcState); + final JsonNode streamState = Jsons.jsonNode(mongoDbStreamState); + final AirbyteStreamState airbyteStreamState = new AirbyteStreamState().withStreamDescriptor(streamDescriptor).withStreamState(streamState); + final AirbyteGlobalState airbyteGlobalState = new AirbyteGlobalState().withSharedState(sharedState).withStreamStates(List.of(airbyteStreamState)); + final AirbyteStateMessage airbyteStateMessage = + new AirbyteStateMessage().withType(AirbyteStateMessage.AirbyteStateType.GLOBAL).withGlobal(airbyteGlobalState); + + final MongoDbStateManager stateManager = MongoDbStateManager.createStateManager(Jsons.jsonNode(List.of(airbyteStateMessage))); + final AirbyteStateMessage generated = stateManager.toState(); + + assertNotNull(generated); + assertEquals(airbyteStateMessage, generated); + + final Map offset2 = Map.of(SOURCE_SECONDS, String.valueOf(1112223334), + SOURCE_ORDER, String.valueOf(2), + SOURCE_RESUME_TOKEN, RESUME_TOKEN); + final MongoDbCdcState updatedCdcState = new MongoDbCdcState(Jsons.jsonNode(offset2)); + stateManager.updateCdcState(updatedCdcState); + + final AirbyteStateMessage generated2 = stateManager.toState(); + + assertNotNull(generated2); + assertEquals(updatedCdcState, Jsons.object(generated2.getGlobal().getSharedState(), MongoDbCdcState.class)); + + final MongoDbStreamState updatedStreamState = new MongoDbStreamState("updated"); + stateManager.updateStreamState(STREAM_NAME, STREAM_NAMESPACE, updatedStreamState); + final AirbyteStateMessage generated3 = stateManager.toState(); + + assertNotNull(generated3); + assertEquals(updatedStreamState.id(), + Jsons.object(generated3.getGlobal().getStreamStates().get(0).getStreamState(), MongoDbStreamState.class).id()); + } + +}