diff --git a/ci/docker-compose.yml b/ci/docker-compose.yml index 6c96a14a67f81..245863727f263 100644 --- a/ci/docker-compose.yml +++ b/ci/docker-compose.yml @@ -77,6 +77,9 @@ services: - db - message_queue - schemaregistry + - mongodb + - mongodb-setup + - mongo_data_generator volumes: - ..:/risingwave @@ -274,3 +277,42 @@ services: interval: 5s timeout: 5s retries: 5 + + mongodb: + image: mongo:4.4 + ports: + - "27017" + command: --replSet rs0 --oplogSize 128 + restart: always + healthcheck: + test: "echo 'db.runCommand({ping: 1})' | mongo" + interval: 5s + timeout: 10s + retries: 3 + + mongodb-setup: + image: mongo:4.4 + container_name: mongodb-setup + depends_on: + - mongodb + entrypoint: + [ + "bash", + "-c", + "sleep 10 && mongo --host mongodb:27017 /config-replica.js && sleep 10" + ] + restart: "no" + volumes: + - ./mongodb/config-replica.js:/config-replica.js + + mongo_data_generator: + build: + context: . + dockerfile: ./mongodb/Dockerfile.generator + container_name: mongo_data_generator + depends_on: + - mongodb + environment: + MONGO_HOST: mongodb + MONGO_PORT: 27017 + MONGO_DB_NAME: random_data diff --git a/ci/mongodb/Dockerfile.generator b/ci/mongodb/Dockerfile.generator new file mode 100644 index 0000000000000..165aea42ec843 --- /dev/null +++ b/ci/mongodb/Dockerfile.generator @@ -0,0 +1,23 @@ +# Use an official Python runtime as a parent image +FROM python:3.8-slim + +# Set the working directory to /app +WORKDIR /app + +# Copy the requirements file into the container at /app +COPY ./mongodb/requirements.txt /app +COPY ./mongodb/app.py /app + +# Install any needed packages specified in requirements.txt +RUN pip install -r requirements.txt + +# Make port 5000 available to the world outside this container +EXPOSE 5000 + +# Define environment variables +ENV MONGO_HOST mongodb +ENV MONGO_PORT 27017 +ENV MONGO_DB_NAME random_data + +# Run app.py when the container launches +CMD ["python", "app.py"] diff --git a/ci/mongodb/app.py b/ci/mongodb/app.py new file mode 100644 index 0000000000000..dee00725d9377 --- /dev/null +++ b/ci/mongodb/app.py @@ -0,0 +1,41 @@ +import os +import pymongo +from faker import Faker + +# To check the data through mongosh or mongo, run the following command: +# > mongosh mongodb://admin:admin123@127.0.0.1:27017 +# > rs0 [direct: primary] test> use random_data +# > rs0 [direct: primary] random_data> db.users.find() +# > rs0 [direct: primary] random_data> db.users.count() + +# Connect to MongoDB +mongo_host = os.environ["MONGO_HOST"] +mongo_port = os.environ["MONGO_PORT"] +mongo_db_name = os.environ["MONGO_DB_NAME"] + +url = f"mongodb://{mongo_host}:{mongo_port}" +client = pymongo.MongoClient(url) +db = client[mongo_db_name] + +# Generate random data +fake = Faker() +collection = db["users"] + +for _ in range(55): + user_data = { + "name": fake.name(), + "address": fake.address(), + "email": fake.email(), + } + collection.insert_one(user_data) + +# Count the number of records in the collection +total_records = collection.count_documents({}) + +# Close the MongoDB connection +client.close() +print(f"Random data generated and inserted into MongoDB: {url}") + +# Print insertion summary +print("Insertion summary:") +print(f"Total records in the collection: {total_records}") diff --git a/integration_tests/debezium-mongo/config-replica.js b/ci/mongodb/config-replica.js similarity index 100% rename from integration_tests/debezium-mongo/config-replica.js rename to ci/mongodb/config-replica.js diff --git a/integration_tests/debezium-mongo/requirements.txt b/ci/mongodb/requirements.txt similarity index 100% rename from integration_tests/debezium-mongo/requirements.txt rename to ci/mongodb/requirements.txt diff --git a/ci/scripts/e2e-source-test.sh b/ci/scripts/e2e-source-test.sh index ec04a1d6863cf..aa6da360fbe52 100755 --- a/ci/scripts/e2e-source-test.sh +++ b/ci/scripts/e2e-source-test.sh @@ -61,6 +61,20 @@ echo "--- starting risingwave cluster" RUST_LOG="info,risingwave_stream=info,risingwave_batch=info,risingwave_storage=info" \ cargo make ci-start ci-1cn-1fe-with-recovery +echo "--- mongodb cdc test" +# install the mongo shell +wget http://archive.ubuntu.com/ubuntu/pool/main/o/openssl/libssl1.1_1.1.1f-1ubuntu2_amd64.deb +wget https://repo.mongodb.org/apt/ubuntu/dists/focal/mongodb-org/4.4/multiverse/binary-amd64/mongodb-org-shell_4.4.28_amd64.deb +dpkg -i libssl1.1_1.1.1f-1ubuntu2_amd64.deb +dpkg -i mongodb-org-shell_4.4.28_amd64.deb + +echo '> ping mongodb' +echo 'db.runCommand({ping: 1})' | mongo mongodb://mongodb:27017 +echo '> rs config' +echo 'rs.conf()' | mongo mongodb://mongodb:27017 +echo '> run test..' +sqllogictest -p 4566 -d dev './e2e_test/source/cdc/mongodb/**/*.slt' + echo "--- inline cdc test" export MYSQL_HOST=mysql MYSQL_TCP_PORT=3306 MYSQL_PWD=123456 sqllogictest -p 4566 -d dev './e2e_test/source/cdc_inline/**/*.slt' diff --git a/ci/scripts/gen-integration-test-yaml.py b/ci/scripts/gen-integration-test-yaml.py index 236559ed566fa..6332b98ecca9f 100644 --- a/ci/scripts/gen-integration-test-yaml.py +++ b/ci/scripts/gen-integration-test-yaml.py @@ -12,6 +12,7 @@ 'schema-registry': ['json'], 'mysql-cdc': ['json'], 'postgres-cdc': ['json'], + 'mongodb-cdc': ['json'], 'mysql-sink': ['json'], 'postgres-sink': ['json'], 'iceberg-cdc': ['json'], diff --git a/e2e_test/source/cdc/mongodb/mongodb_basic.slt b/e2e_test/source/cdc/mongodb/mongodb_basic.slt new file mode 100644 index 0000000000000..f3a815df0572b --- /dev/null +++ b/e2e_test/source/cdc/mongodb/mongodb_basic.slt @@ -0,0 +1,28 @@ +# CDC source basic test +control substitution on + +statement ok +CREATE TABLE users (_id JSONB PRIMARY KEY, payload JSONB) WITH ( + connector = 'mongodb-cdc', + mongodb.url = 'mongodb://mongodb:27017/?replicaSet=rs0', + collection.name = 'random_data.*' +); + +statement ok +CREATE MATERIALIZED VIEW normalized_users AS +SELECT + payload ->> 'name' as name, + payload ->> 'email' as email, + payload ->> 'address' as address +FROM + users; + +sleep 5s + +query I +select count(*) from normalized_users; +---- +55 + +statement ok +DROP TABLE users cascade diff --git a/integration_tests/debezium-mongo/docker-compose.yaml b/integration_tests/debezium-mongo/docker-compose.yaml index afa753440f77d..4ae90ea22eb94 100644 --- a/integration_tests/debezium-mongo/docker-compose.yaml +++ b/integration_tests/debezium-mongo/docker-compose.yaml @@ -28,32 +28,12 @@ services: service: message_queue mongodb: - image: mongo:4.4 - container_name: mongodb - ports: - - "27017:27017" - command: --replSet rs0 --oplogSize 128 - restart: always - healthcheck: - test: "echo 'db.runCommand({ping: 1})' | mongo" - interval: 5s - timeout: 10s - retries: 3 + extends: ../mongodb/docker-compose.yaml + service: mongodb mongodb-setup: - image: mongo:4.4 - container_name: mongodb-setup - depends_on: - - mongodb - entrypoint: - [ - "bash", - "-c", - "sleep 10 && mongo --host mongodb:27017 /config-replica.js && sleep 10" - ] - restart: "no" - volumes: - - ./config-replica.js:/config-replica.js + extends: ../mongodb/docker-compose.yaml + service: mongodb-setup debezium: image: debezium/connect:1.9 @@ -78,16 +58,9 @@ services: CONNECT_VALUE_CONVERTER_SCHEMA_REGISTRY_URL: http://message_queue:8081 random_data_generator: - build: - context: . - dockerfile: Dockerfile.generator - container_name: random_data_generator - depends_on: - - mongodb - environment: - MONGO_HOST: mongodb - MONGO_PORT: 27017 - MONGO_DB_NAME: random_data + extends: ../mongodb/docker-compose.yaml + service: random_data_generator + register-mongodb-connector: image: curlimages/curl:7.79.1 diff --git a/integration_tests/mongodb-cdc/create_mv.sql b/integration_tests/mongodb-cdc/create_mv.sql new file mode 100644 index 0000000000000..17ce354009c7b --- /dev/null +++ b/integration_tests/mongodb-cdc/create_mv.sql @@ -0,0 +1,7 @@ +CREATE MATERIALIZED VIEW normalized_users AS +SELECT + payload ->> 'name' as name, + payload ->> 'email' as email, + payload ->> 'address' as address +FROM + users; \ No newline at end of file diff --git a/integration_tests/mongodb-cdc/create_source.sql b/integration_tests/mongodb-cdc/create_source.sql new file mode 100644 index 0000000000000..e26160a40f258 --- /dev/null +++ b/integration_tests/mongodb-cdc/create_source.sql @@ -0,0 +1,5 @@ +CREATE TABLE users (_id JSONB PRIMARY KEY, payload JSONB) WITH ( + connector = 'mongodb-cdc', + mongodb.url = 'mongodb://mongodb:27017/?replicaSet=rs0', + collection.name = 'random_data.*' +); \ No newline at end of file diff --git a/integration_tests/mongodb-cdc/data_check b/integration_tests/mongodb-cdc/data_check new file mode 100644 index 0000000000000..c57752e1fd9b2 --- /dev/null +++ b/integration_tests/mongodb-cdc/data_check @@ -0,0 +1 @@ +users,normalized_users \ No newline at end of file diff --git a/integration_tests/mongodb-cdc/docker-compose.yaml b/integration_tests/mongodb-cdc/docker-compose.yaml new file mode 100644 index 0000000000000..60d477945b38b --- /dev/null +++ b/integration_tests/mongodb-cdc/docker-compose.yaml @@ -0,0 +1,48 @@ +--- +version: "3" +services: + risingwave-standalone: + extends: + file: ../../docker/docker-compose.yml + service: risingwave-standalone + etcd-0: + extends: + file: ../../docker/docker-compose.yml + service: etcd-0 + grafana-0: + extends: + file: ../../docker/docker-compose.yml + service: grafana-0 + minio-0: + extends: + file: ../../docker/docker-compose.yml + service: minio-0 + prometheus-0: + extends: + file: ../../docker/docker-compose.yml + service: prometheus-0 + mongodb: + extends: + file: ../mongodb/docker-compose.yaml + service: mongodb + mongodb-setup: + extends: + file: ../mongodb/docker-compose.yaml + service: mongodb-setup + random_data_generator: + extends: + file: ../mongodb/docker-compose.yaml + service: random_data_generator + +volumes: + risingwave-standalone: + external: false + etcd-0: + external: false + grafana-0: + external: false + minio-0: + external: false + prometheus-0: + external: false +name: risingwave-compose diff --git a/integration_tests/debezium-mongo/Dockerfile.generator b/integration_tests/mongodb/Dockerfile.generator similarity index 100% rename from integration_tests/debezium-mongo/Dockerfile.generator rename to integration_tests/mongodb/Dockerfile.generator diff --git a/integration_tests/debezium-mongo/app.py b/integration_tests/mongodb/app.py similarity index 100% rename from integration_tests/debezium-mongo/app.py rename to integration_tests/mongodb/app.py diff --git a/integration_tests/mongodb/config-replica.js b/integration_tests/mongodb/config-replica.js new file mode 100644 index 0000000000000..29715221b6011 --- /dev/null +++ b/integration_tests/mongodb/config-replica.js @@ -0,0 +1,7 @@ +rsconf = { + _id: "rs0", + members: [{ _id: 0, host: "mongodb:27017", priority: 1.0 }], +}; +rs.initiate(rsconf); +rs.status(); + diff --git a/integration_tests/mongodb/docker-compose.yaml b/integration_tests/mongodb/docker-compose.yaml new file mode 100644 index 0000000000000..59ac89215ec14 --- /dev/null +++ b/integration_tests/mongodb/docker-compose.yaml @@ -0,0 +1,39 @@ +version: "3" +services: + mongodb: + image: mongo:4.4 + container_name: mongodb + ports: + - "27017:27017" + command: --replSet rs0 --oplogSize 128 + restart: always + healthcheck: + test: "echo 'db.runCommand({ping: 1})' | mongo" + interval: 5s + timeout: 10s + retries: 3 + mongodb-setup: + image: mongo:4.4 + container_name: mongodb-setup + depends_on: + - mongodb + entrypoint: + [ + "bash", + "-c", + "sleep 10 && mongo --host mongodb:27017 /config-replica.js && sleep 10" + ] + restart: "no" + volumes: + - ./config-replica.js:/config-replica.js + random_data_generator: + build: + context: . + dockerfile: Dockerfile.generator + container_name: random_data_generator + depends_on: + - mongodb + environment: + MONGO_HOST: mongodb + MONGO_PORT: 27017 + MONGO_DB_NAME: random_data diff --git a/integration_tests/mongodb/requirements.txt b/integration_tests/mongodb/requirements.txt new file mode 100644 index 0000000000000..2bb533309a7a5 --- /dev/null +++ b/integration_tests/mongodb/requirements.txt @@ -0,0 +1,2 @@ +pymongo +Faker diff --git a/java/connector-node/risingwave-connector-service/src/main/java/com/risingwave/connector/source/common/CdcConnectorException.java b/java/connector-node/risingwave-connector-service/src/main/java/com/risingwave/connector/source/common/CdcConnectorException.java new file mode 100644 index 0000000000000..a389b08df0fa9 --- /dev/null +++ b/java/connector-node/risingwave-connector-service/src/main/java/com/risingwave/connector/source/common/CdcConnectorException.java @@ -0,0 +1,41 @@ +// Copyright 2024 RisingWave Labs +// +// Licensed under the Apache License, Version 2.0 (the "License"); +// you may not use this file except in compliance with the License. +// You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, software +// distributed under the License is distributed on an "AS IS" BASIS, +// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +// See the License for the specific language governing permissions and +// limitations under the License. + +package com.risingwave.connector.source.common; + +public class CdcConnectorException extends RuntimeException { + private static final long serialVersionUID = 1L; + + public CdcConnectorException() {} + + public CdcConnectorException(String message) { + super(message); + } + + public CdcConnectorException(String message, Throwable cause) { + super(message, cause); + } + + public CdcConnectorException(Throwable cause) { + super(cause); + } + + public CdcConnectorException( + String message, + Throwable cause, + boolean enableSuppression, + boolean writableStackTrace) { + super(message, cause, enableSuppression, writableStackTrace); + } +} diff --git a/java/connector-node/risingwave-connector-service/src/main/java/com/risingwave/connector/source/common/MongoDbValidator.java b/java/connector-node/risingwave-connector-service/src/main/java/com/risingwave/connector/source/common/MongoDbValidator.java index 46b0c36304d78..8175ef9a48025 100644 --- a/java/connector-node/risingwave-connector-service/src/main/java/com/risingwave/connector/source/common/MongoDbValidator.java +++ b/java/connector-node/risingwave-connector-service/src/main/java/com/risingwave/connector/source/common/MongoDbValidator.java @@ -14,9 +14,14 @@ package com.risingwave.connector.source.common; +import com.mongodb.ConnectionString; import com.mongodb.client.MongoClient; import com.mongodb.client.MongoClients; +import java.util.List; import java.util.Map; +import org.bson.BsonDocument; +import org.bson.Document; +import org.bson.conversions.Bson; import org.slf4j.Logger; import org.slf4j.LoggerFactory; @@ -24,9 +29,20 @@ public class MongoDbValidator extends DatabaseValidator { private static final Logger LOG = LoggerFactory.getLogger(MongoDbValidator.class); String mongodbUrl; + boolean isShardedCluster; + + ConnectionString connStr; + MongoClient client; + static final String USERS = "users"; + static final String ROLES = "roles"; + static final String INHERITED_ROLES = "inheritedRoles"; + static final String INHERITED_PRIVILEGES = "inheritedPrivileges"; public MongoDbValidator(Map userProps) { this.mongodbUrl = userProps.get("mongodb.url"); + this.connStr = new ConnectionString(mongodbUrl); + this.isShardedCluster = false; + this.client = MongoClients.create(connStr.toString()); } @Override @@ -38,15 +54,79 @@ public void validateDbConfig() { } } + boolean checkReadRoleForAdminDb(List roles) { + for (Document roleDoc : roles) { + var db = roleDoc.getString("db"); + var role = roleDoc.getString("role"); + if (db.equals("admin") + && (role.equals("readWrite") + || role.equals("read") + || role.equals("readWriteAnyDatabase") + || roles.equals("readAnyDatabase"))) { + LOG.info("user has the appropriate roles to read the admin database"); + return true; + } + } + return false; + } + @Override void validateUserPrivilege() { - // TODO: check user privilege // https://debezium.io/documentation/reference/stable/connectors/mongodb.html#setting-up-mongodb // You must also have a MongoDB user that has the appropriate roles to read the admin // database where the oplog can be read. Additionally, the user must also be able to read // the config database in the configuration server of a sharded cluster and must have // listDatabases privilege action. When change streams are used (the default) the user also // must have cluster-wide privilege actions find and changeStream. + + if (null != connStr.getCredential()) { + var secret = connStr.getCredential(); + var authDb = client.getDatabase(secret.getSource()); + + Bson command = + BsonDocument.parse( + String.format( + "{usersInfo: \"%s\", showPrivileges: true}", + secret.getUserName())); + + Document ret = authDb.runCommand(command); + LOG.info("mongodb userInfo: {}", ret.toJson()); + + List users = ret.getEmbedded(List.of(USERS), List.class); + LOG.info("mongodb users => {}", users); + if (users.isEmpty()) { + throw new CdcConnectorException("user not found in the database"); + } + + // https://debezium.io/documentation/reference/stable/connectors/mongodb.html#setting-up-mongodb + // You must also have a MongoDB user that has the appropriate roles to read the admin + // database where the oplog can be read. boolean hasReadForAdmin = false; + Document user = users.get(0); + List roles = user.getEmbedded(List.of(ROLES), List.class); + boolean hasReadForAdmin = false; + if (!roles.isEmpty()) { + // check direct roles + hasReadForAdmin = checkReadRoleForAdminDb(roles); + if (!hasReadForAdmin) { + // check inherited roles + List inheriRoles = + user.getEmbedded(List.of(INHERITED_ROLES), List.class); + if (!inheriRoles.isEmpty()) { + hasReadForAdmin = checkReadRoleForAdminDb(inheriRoles); + } + } + } + if (!hasReadForAdmin) { + throw new CdcConnectorException( + "user does not have the appropriate roles to read the admin database"); + } + + // When change streams are used (the default) the user also + // must have cluster-wide privilege actions find and changeStream. + // TODO: may check the privilege actions find and changeStream + } + + // TODO: may check privilege for sharded cluster } @Override diff --git a/java/connector-node/risingwave-connector-service/src/main/resources/mongodb.properties b/java/connector-node/risingwave-connector-service/src/main/resources/mongodb.properties index a1a5b92e9f63f..9181449b0400b 100644 --- a/java/connector-node/risingwave-connector-service/src/main/resources/mongodb.properties +++ b/java/connector-node/risingwave-connector-service/src/main/resources/mongodb.properties @@ -1,11 +1,11 @@ -# configs for postgres conneoctor +# https://debezium.io/documentation/reference/stable/connectors/mongodb.html#mongodb-connector-properties connector.class=io.debezium.connector.mongodb.MongoDbConnector # default snapshot mode to initial snapshot.mode=${debezium.snapshot.mode:-initial} mongodb.connection.string=${mongodb.url} collection.include.list=${collection.name} -# default heartbeat interval 5 mins -heartbeat.interval.ms=${debezium.heartbeat.interval.ms:-300000} +# default heartbeat interval 60 seconds +heartbeat.interval.ms=${debezium.heartbeat.interval.ms:-60000} # TODO: set this field in the code name=${collection.name} provide.transaction.metadata=${transactional:-false} @@ -13,3 +13,8 @@ provide.transaction.metadata=${transactional:-false} capture.mode=${debezium.capture.mode:-change_streams_update_full} # disable tombstones event tombstones.on.delete=${debezium.tombstones.on.delete:-false} +# The number of milliseconds the driver will wait before a new connection attempt is aborted. +mongodb.connect.timeout.ms=${debezium.mongodb.connect.timeout.ms:-15000} +# The frequency that the cluster monitor attempts to reach each server. +mongodb.heartbeat.frequency.ms=${debezium.mongodb.heartbeat.frequency.ms:-10000} + diff --git a/java/connector-node/risingwave-source-test/src/test/java/com/risingwave/connector/source/MongoDbSourceTest.java b/java/connector-node/risingwave-source-test/src/test/java/com/risingwave/connector/source/MongoDbSourceTest.java new file mode 100644 index 0000000000000..0e91cfd1ed654 --- /dev/null +++ b/java/connector-node/risingwave-source-test/src/test/java/com/risingwave/connector/source/MongoDbSourceTest.java @@ -0,0 +1,92 @@ +// Copyright 2024 RisingWave Labs +// +// Licensed under the Apache License, Version 2.0 (the "License"); +// you may not use this file except in compliance with the License. +// You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, software +// distributed under the License is distributed on an "AS IS" BASIS, +// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +// See the License for the specific language governing permissions and +// limitations under the License. + +package com.risingwave.connector.source; + +import com.risingwave.connector.ConnectorServiceImpl; +import com.risingwave.proto.ConnectorServiceProto; +import io.grpc.*; +import java.util.HashMap; +import java.util.Iterator; +import java.util.List; +import java.util.Map; +import java.util.concurrent.Callable; +import java.util.concurrent.Executors; +import org.junit.*; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +public class MongoDbSourceTest { + private static final Logger LOG = LoggerFactory.getLogger(MongoDbSourceTest.class.getName()); + + public static Server connectorService = + ServerBuilder.forPort(SourceTestClient.DEFAULT_PORT) + .addService(new ConnectorServiceImpl()) + .build(); + + public static SourceTestClient testClient = + new SourceTestClient( + Grpc.newChannelBuilder( + "localhost:" + SourceTestClient.DEFAULT_PORT, + InsecureChannelCredentials.create()) + .build()); + + @BeforeClass + public static void init() { + try { + connectorService.start(); + } catch (Exception e) { + LOG.error("failed to start connector service", e); + Assert.fail(); + } + } + + @AfterClass + public static void cleanup() { + connectorService.shutdown(); + } + + // manually test + @Test + @Ignore + public void testSnapshotLoad() throws Exception { + Map props = new HashMap<>(); + props.put("mongodb.url", "mongodb://localhost:27017/?replicaSet=rs0"); + props.put("collection.name", "dev.*"); + Iterator eventStream = + testClient.getEventStream(ConnectorServiceProto.SourceType.MONGODB, 3001, props); + Callable countTask = + () -> { + int count = 0; + while (eventStream.hasNext()) { + List messages = + eventStream.next().getEventsList(); + for (ConnectorServiceProto.CdcMessage msg : messages) { + if (!msg.getPayload().isBlank()) { + count++; + } + // Only read 10 messages + if (count >= 10) { + return count; + } + } + } + return count; + }; + + var pool = Executors.newFixedThreadPool(1); + var result = pool.submit(countTask); + Assert.assertEquals(10, result.get().intValue()); + } +} diff --git a/java/connector-node/risingwave-source-test/src/test/java/com/risingwave/connector/source/MongoDbValidatorTest.java b/java/connector-node/risingwave-source-test/src/test/java/com/risingwave/connector/source/MongoDbValidatorTest.java new file mode 100644 index 0000000000000..cecd8fb4855b0 --- /dev/null +++ b/java/connector-node/risingwave-source-test/src/test/java/com/risingwave/connector/source/MongoDbValidatorTest.java @@ -0,0 +1,32 @@ +// Copyright 2024 RisingWave Labs +// +// Licensed under the Apache License, Version 2.0 (the "License"); +// you may not use this file except in compliance with the License. +// You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, software +// distributed under the License is distributed on an "AS IS" BASIS, +// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +// See the License for the specific language governing permissions and +// limitations under the License. + +package com.risingwave.connector.source; + +import com.risingwave.connector.source.common.MongoDbValidator; +import java.util.HashMap; +import org.junit.Ignore; +import org.junit.Test; + +public class MongoDbValidatorTest { + + @Ignore // manual test + @Test + public void testValidate() { + var userProps = new HashMap(); + userProps.put("mongodb.url", "mongodb://rwcdc:123456@localhost:27017/?authSource=admin"); + MongoDbValidator validator = new MongoDbValidator(userProps); + validator.validateAll(); + } +} diff --git a/java/connector-node/risingwave-source-test/src/test/resources/log4j2.properties b/java/connector-node/risingwave-source-test/src/test/resources/log4j2.properties index 856287e831da8..e8d935eeae5f3 100644 --- a/java/connector-node/risingwave-source-test/src/test/resources/log4j2.properties +++ b/java/connector-node/risingwave-source-test/src/test/resources/log4j2.properties @@ -8,7 +8,3 @@ appender.console.layout.type=PatternLayout appender.console.layout.pattern=%d{yyyy-MM-dd HH:mm:ss,SSS} %-5p [%t] %c{2}:%L - %m%n rootLogger.appenderRefs=console rootLogger.appenderRef.console.ref=stdout -logger.connector.name=com.risingwave.connector.source -logger.connector.level=INFO -logger.connector.appenderRefs=console -logger.connector.appenderRef.console.ref=stdout diff --git a/src/connector/src/source/cdc/split.rs b/src/connector/src/source/cdc/split.rs index 9817512baa078..82c36a29892db 100644 --- a/src/connector/src/source/cdc/split.rs +++ b/src/connector/src/source/cdc/split.rs @@ -47,6 +47,32 @@ trait CdcSplitTrait: Send + Sync { fn start_offset(&self) -> &Option; fn is_snapshot_done(&self) -> bool; fn update_with_offset(&mut self, start_offset: String) -> ConnectorResult<()>; + + // MySQL and MongoDB shares the same logic to extract the snapshot flag + fn extract_snapshot_flag(&self, start_offset: &str) -> ConnectorResult { + // if snapshot_done is already true, it won't be changed + let mut snapshot_done = self.is_snapshot_done(); + if snapshot_done { + return Ok(snapshot_done); + } + + let dbz_offset: DebeziumOffset = serde_json::from_str(start_offset).with_context(|| { + format!( + "invalid cdc offset: {}, split: {}", + start_offset, + self.split_id() + ) + })?; + + // heartbeat event should not update the `snapshot_done` flag + if !dbz_offset.is_heartbeat { + snapshot_done = match dbz_offset.source_offset.snapshot { + Some(val) => !val, + None => true, + }; + } + Ok(snapshot_done) + } } #[derive(Clone, Serialize, Deserialize, Debug, PartialEq, Hash)] @@ -91,27 +117,9 @@ impl CdcSplitTrait for MySqlCdcSplit { } fn update_with_offset(&mut self, start_offset: String) -> ConnectorResult<()> { - let mut snapshot_done = self.inner.snapshot_done; - if !snapshot_done { - let dbz_offset: DebeziumOffset = - serde_json::from_str(&start_offset).with_context(|| { - format!( - "invalid mysql offset: {}, split: {}", - start_offset, self.inner.split_id - ) - })?; - - // heartbeat event should not update the `snapshot_done` flag - if !dbz_offset.is_heartbeat { - snapshot_done = match dbz_offset.source_offset.snapshot { - Some(val) => !val, - None => true, - }; - } - } - self.inner.start_offset = Some(start_offset); // if snapshot_done is already true, it won't be updated - self.inner.snapshot_done = snapshot_done; + self.inner.snapshot_done = self.extract_snapshot_flag(start_offset.as_str())?; + self.inner.start_offset = Some(start_offset); Ok(()) } } @@ -144,29 +152,34 @@ impl CdcSplitTrait for PostgresCdcSplit { } fn update_with_offset(&mut self, start_offset: String) -> ConnectorResult<()> { - let mut snapshot_done = self.inner.snapshot_done; - if !snapshot_done { - let dbz_offset: DebeziumOffset = - serde_json::from_str(&start_offset).with_context(|| { - format!( - "invalid postgres offset: {}, split: {}", - start_offset, self.inner.split_id - ) - })?; - - // heartbeat event should not update the `snapshot_done` flag - if !dbz_offset.is_heartbeat { - snapshot_done = dbz_offset - .source_offset - .last_snapshot_record - .unwrap_or(false); - } - } + self.inner.snapshot_done = self.extract_snapshot_flag(start_offset.as_str())?; self.inner.start_offset = Some(start_offset); - // if snapshot_done is already true, it won't be updated - self.inner.snapshot_done = snapshot_done; Ok(()) } + + fn extract_snapshot_flag(&self, start_offset: &str) -> ConnectorResult { + // if snapshot_done is already true, it won't be changed + let mut snapshot_done = self.is_snapshot_done(); + if snapshot_done { + return Ok(snapshot_done); + } + + let dbz_offset: DebeziumOffset = serde_json::from_str(start_offset).with_context(|| { + format!( + "invalid postgres offset: {}, split: {}", + start_offset, self.inner.split_id + ) + })?; + + // heartbeat event should not update the `snapshot_done` flag + if !dbz_offset.is_heartbeat { + snapshot_done = dbz_offset + .source_offset + .last_snapshot_record + .unwrap_or(false); + } + Ok(snapshot_done) + } } impl MongoDbCdcSplit { @@ -194,29 +207,9 @@ impl CdcSplitTrait for MongoDbCdcSplit { } fn update_with_offset(&mut self, start_offset: String) -> ConnectorResult<()> { - let mut snapshot_done = self.inner.snapshot_done; - // extract snapshot state from debezium offset - if !snapshot_done { - let dbz_offset: DebeziumOffset = - serde_json::from_str(&start_offset).with_context(|| { - format!( - "invalid mongodb offset: {}, split: {}", - start_offset, self.inner.split_id - ) - })?; - - // heartbeat event should not update the `snapshot_done` flag - if !dbz_offset.is_heartbeat { - snapshot_done = match dbz_offset.source_offset.snapshot { - Some(val) => !val, - None => true, - }; - } - } - - self.inner.start_offset = Some(start_offset); // if snapshot_done is already true, it will remain true - self.inner.snapshot_done = snapshot_done; + self.inner.snapshot_done = self.extract_snapshot_flag(start_offset.as_str())?; + self.inner.start_offset = Some(start_offset); Ok(()) } }