Skip to content

test(mongodb-cdc): add integration and e2e test #15003

New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Merged
merged 49 commits into from
Mar 21, 2024
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
Show all changes
49 commits
Select commit Hold shift + click to select a range
7399c7c
WIP: mongodb
StrikeW Jan 26, 2024
6a5bbb9
mongo source unit test
StrikeW Jan 27, 2024
fd4aa01
Add mongodb split and refactor cdc split
StrikeW Feb 1, 2024
3798597
fix sql parser
StrikeW Feb 2, 2024
d62252c
fix validate
StrikeW Feb 3, 2024
b4709b6
minor
StrikeW Feb 3, 2024
cc777ae
clean code
StrikeW Feb 3, 2024
e4782a5
clean node 2
StrikeW Feb 3, 2024
8e73db3
support delete
StrikeW Feb 3, 2024
d170224
make clippy happy
StrikeW Feb 3, 2024
8433b84
Merge remote-tracking branch 'origin/main' into siyuan/mongodb-source
StrikeW Feb 3, 2024
6fab333
clean code
StrikeW Feb 3, 2024
17af015
update snapshot_done flag when snapshot is finished
StrikeW Feb 4, 2024
1fc3a52
minor
StrikeW Feb 4, 2024
ef7ca65
clippy
StrikeW Feb 4, 2024
bcabf96
minor
StrikeW Feb 5, 2024
a267376
add integration test
StrikeW Feb 5, 2024
3863a23
refactor integration test & add e2e test
StrikeW Feb 5, 2024
cbd0b17
refine
StrikeW Feb 5, 2024
20af6f4
fix e2e
StrikeW Feb 5, 2024
142a62c
try fix
StrikeW Feb 5, 2024
9ea705d
refine e2e test
StrikeW Feb 5, 2024
b810b7f
try fix ci
StrikeW Feb 5, 2024
45398f9
Merge remote-tracking branch 'origin/main' into siyuan/mongodb-source
StrikeW Feb 18, 2024
5c7420f
fix comments
StrikeW Feb 18, 2024
5987352
throw error
StrikeW Feb 18, 2024
a3f3f54
fix
StrikeW Feb 18, 2024
0c313db
Merge branch 'siyuan/mongodb-source' into siyuan/mongodb-test
StrikeW Feb 19, 2024
f53cf2a
impl validator
StrikeW Feb 20, 2024
3955ee2
Merge remote-tracking branch 'origin/main' into siyuan/mongodb-source
StrikeW Feb 26, 2024
71f6f9d
fix clippy
StrikeW Feb 26, 2024
6ec7be3
add todo for mongodb user privilege check
StrikeW Feb 26, 2024
89f1911
minor
StrikeW Feb 26, 2024
5388f6c
add some comments
StrikeW Feb 29, 2024
7ed6661
Merge remote-tracking branch 'origin/main' into siyuan/mongodb-source
StrikeW Feb 29, 2024
dc0a120
Merge branch 'siyuan/mongodb-source' into siyuan/mongodb-test
StrikeW Mar 1, 2024
0d62a85
WIP: check mongo privilege
StrikeW Mar 5, 2024
08a5d60
Merge remote-tracking branch 'origin/main' into siyuan/mongodb-test
StrikeW Mar 6, 2024
5e6c0dd
format
StrikeW Mar 12, 2024
eba1236
Merge remote-tracking branch 'origin/main' into siyuan/mongodb-test
StrikeW Mar 12, 2024
1539f36
format
StrikeW Mar 12, 2024
54c6527
Merge remote-tracking branch 'origin/main' into siyuan/mongodb-test
StrikeW Mar 18, 2024
e16f24a
tested with mongodb cloud
StrikeW Mar 18, 2024
1490355
refactor snapshot flag update
StrikeW Mar 18, 2024
1d9a2c7
minor
StrikeW Mar 18, 2024
bd1e824
fix
StrikeW Mar 18, 2024
52f6eb2
add mongodb-cdc it test
StrikeW Mar 18, 2024
85a88a6
revert
StrikeW Mar 18, 2024
aaf9453
Merge remote-tracking branch 'origin/main' into siyuan/mongodb-test
StrikeW Mar 18, 2024
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
42 changes: 42 additions & 0 deletions ci/docker-compose.yml
Original file line number Diff line number Diff line change
Expand Up @@ -77,6 +77,9 @@ services:
- db
- message_queue
- schemaregistry
- mongodb
- mongodb-setup
- mongo_data_generator
volumes:
- ..:/risingwave

Expand Down Expand Up @@ -274,3 +277,42 @@ services:
interval: 5s
timeout: 5s
retries: 5

mongodb:
image: mongo:4.4
ports:
- "27017"
command: --replSet rs0 --oplogSize 128
restart: always
healthcheck:
test: "echo 'db.runCommand({ping: 1})' | mongo"
interval: 5s
timeout: 10s
retries: 3

mongodb-setup:
image: mongo:4.4
container_name: mongodb-setup
depends_on:
- mongodb
entrypoint:
[
"bash",
"-c",
"sleep 10 && mongo --host mongodb:27017 /config-replica.js && sleep 10"
]
restart: "no"
volumes:
- ./mongodb/config-replica.js:/config-replica.js

mongo_data_generator:
build:
context: .
dockerfile: ./mongodb/Dockerfile.generator
container_name: mongo_data_generator
depends_on:
- mongodb
environment:
MONGO_HOST: mongodb
MONGO_PORT: 27017
MONGO_DB_NAME: random_data
23 changes: 23 additions & 0 deletions ci/mongodb/Dockerfile.generator
Original file line number Diff line number Diff line change
@@ -0,0 +1,23 @@
# Use an official Python runtime as a parent image
FROM python:3.8-slim

# Set the working directory to /app
WORKDIR /app

# Copy the requirements file into the container at /app
COPY ./mongodb/requirements.txt /app
COPY ./mongodb/app.py /app

# Install any needed packages specified in requirements.txt
RUN pip install -r requirements.txt

# Make port 5000 available to the world outside this container
EXPOSE 5000

# Define environment variables
ENV MONGO_HOST mongodb
ENV MONGO_PORT 27017
ENV MONGO_DB_NAME random_data

# Run app.py when the container launches
CMD ["python", "app.py"]
41 changes: 41 additions & 0 deletions ci/mongodb/app.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,41 @@
import os
import pymongo
from faker import Faker

# To check the data through mongosh or mongo, run the following command:
# > mongosh mongodb://admin:[email protected]:27017
# > rs0 [direct: primary] test> use random_data
# > rs0 [direct: primary] random_data> db.users.find()
# > rs0 [direct: primary] random_data> db.users.count()

# Connect to MongoDB
mongo_host = os.environ["MONGO_HOST"]
mongo_port = os.environ["MONGO_PORT"]
mongo_db_name = os.environ["MONGO_DB_NAME"]

url = f"mongodb://{mongo_host}:{mongo_port}"
client = pymongo.MongoClient(url)
db = client[mongo_db_name]

# Generate random data
fake = Faker()
collection = db["users"]

for _ in range(55):
user_data = {
"name": fake.name(),
"address": fake.address(),
"email": fake.email(),
}
collection.insert_one(user_data)

# Count the number of records in the collection
total_records = collection.count_documents({})

# Close the MongoDB connection
client.close()
print(f"Random data generated and inserted into MongoDB: {url}")

# Print insertion summary
print("Insertion summary:")
print(f"Total records in the collection: {total_records}")
14 changes: 14 additions & 0 deletions ci/scripts/e2e-source-test.sh
Original file line number Diff line number Diff line change
Expand Up @@ -61,6 +61,20 @@ echo "--- starting risingwave cluster"
RUST_LOG="info,risingwave_stream=info,risingwave_batch=info,risingwave_storage=info" \
cargo make ci-start ci-1cn-1fe-with-recovery

echo "--- mongodb cdc test"
# install the mongo shell
wget http://archive.ubuntu.com/ubuntu/pool/main/o/openssl/libssl1.1_1.1.1f-1ubuntu2_amd64.deb
wget https://repo.mongodb.org/apt/ubuntu/dists/focal/mongodb-org/4.4/multiverse/binary-amd64/mongodb-org-shell_4.4.28_amd64.deb
dpkg -i libssl1.1_1.1.1f-1ubuntu2_amd64.deb
dpkg -i mongodb-org-shell_4.4.28_amd64.deb

echo '> ping mongodb'
echo 'db.runCommand({ping: 1})' | mongo mongodb://mongodb:27017
echo '> rs config'
echo 'rs.conf()' | mongo mongodb://mongodb:27017
echo '> run test..'
sqllogictest -p 4566 -d dev './e2e_test/source/cdc/mongodb/**/*.slt'

echo "--- inline cdc test"
export MYSQL_HOST=mysql MYSQL_TCP_PORT=3306 MYSQL_PWD=123456
sqllogictest -p 4566 -d dev './e2e_test/source/cdc_inline/**/*.slt'
Expand Down
1 change: 1 addition & 0 deletions ci/scripts/gen-integration-test-yaml.py
Original file line number Diff line number Diff line change
Expand Up @@ -12,6 +12,7 @@
'schema-registry': ['json'],
'mysql-cdc': ['json'],
'postgres-cdc': ['json'],
'mongodb-cdc': ['json'],
'mysql-sink': ['json'],
'postgres-sink': ['json'],
'iceberg-cdc': ['json'],
Expand Down
28 changes: 28 additions & 0 deletions e2e_test/source/cdc/mongodb/mongodb_basic.slt
Original file line number Diff line number Diff line change
@@ -0,0 +1,28 @@
# CDC source basic test
control substitution on

statement ok
CREATE TABLE users (_id JSONB PRIMARY KEY, payload JSONB) WITH (
connector = 'mongodb-cdc',
mongodb.url = 'mongodb://mongodb:27017/?replicaSet=rs0',
collection.name = 'random_data.*'
);

statement ok
CREATE MATERIALIZED VIEW normalized_users AS
SELECT
payload ->> 'name' as name,
payload ->> 'email' as email,
payload ->> 'address' as address
FROM
users;

sleep 5s

query I
select count(*) from normalized_users;
----
55

statement ok
DROP TABLE users cascade
41 changes: 7 additions & 34 deletions integration_tests/debezium-mongo/docker-compose.yaml
Original file line number Diff line number Diff line change
Expand Up @@ -28,32 +28,12 @@ services:
service: message_queue

mongodb:
image: mongo:4.4
container_name: mongodb
ports:
- "27017:27017"
command: --replSet rs0 --oplogSize 128
restart: always
healthcheck:
test: "echo 'db.runCommand({ping: 1})' | mongo"
interval: 5s
timeout: 10s
retries: 3
extends: ../mongodb/docker-compose.yaml
service: mongodb

mongodb-setup:
image: mongo:4.4
container_name: mongodb-setup
depends_on:
- mongodb
entrypoint:
[
"bash",
"-c",
"sleep 10 && mongo --host mongodb:27017 /config-replica.js && sleep 10"
]
restart: "no"
volumes:
- ./config-replica.js:/config-replica.js
extends: ../mongodb/docker-compose.yaml
service: mongodb-setup

debezium:
image: debezium/connect:1.9
Expand All @@ -78,16 +58,9 @@ services:
CONNECT_VALUE_CONVERTER_SCHEMA_REGISTRY_URL: http://message_queue:8081

random_data_generator:
build:
context: .
dockerfile: Dockerfile.generator
container_name: random_data_generator
depends_on:
- mongodb
environment:
MONGO_HOST: mongodb
MONGO_PORT: 27017
MONGO_DB_NAME: random_data
extends: ../mongodb/docker-compose.yaml
service: random_data_generator


register-mongodb-connector:
image: curlimages/curl:7.79.1
Expand Down
7 changes: 7 additions & 0 deletions integration_tests/mongodb-cdc/create_mv.sql
Original file line number Diff line number Diff line change
@@ -0,0 +1,7 @@
CREATE MATERIALIZED VIEW normalized_users AS
SELECT
payload ->> 'name' as name,
payload ->> 'email' as email,
payload ->> 'address' as address
FROM
users;
5 changes: 5 additions & 0 deletions integration_tests/mongodb-cdc/create_source.sql
Original file line number Diff line number Diff line change
@@ -0,0 +1,5 @@
CREATE TABLE users (_id JSONB PRIMARY KEY, payload JSONB) WITH (
connector = 'mongodb-cdc',
mongodb.url = 'mongodb://mongodb:27017/?replicaSet=rs0',
collection.name = 'random_data.*'
);
1 change: 1 addition & 0 deletions integration_tests/mongodb-cdc/data_check
Original file line number Diff line number Diff line change
@@ -0,0 +1 @@
users,normalized_users
48 changes: 48 additions & 0 deletions integration_tests/mongodb-cdc/docker-compose.yaml
Original file line number Diff line number Diff line change
@@ -0,0 +1,48 @@
---
version: "3"
services:
risingwave-standalone:
extends:
file: ../../docker/docker-compose.yml
service: risingwave-standalone
etcd-0:
extends:
file: ../../docker/docker-compose.yml
service: etcd-0
grafana-0:
extends:
file: ../../docker/docker-compose.yml
service: grafana-0
minio-0:
extends:
file: ../../docker/docker-compose.yml
service: minio-0
prometheus-0:
extends:
file: ../../docker/docker-compose.yml
service: prometheus-0
mongodb:
extends:
file: ../mongodb/docker-compose.yaml
service: mongodb
mongodb-setup:
extends:
file: ../mongodb/docker-compose.yaml
service: mongodb-setup
random_data_generator:
extends:
file: ../mongodb/docker-compose.yaml
service: random_data_generator

volumes:
risingwave-standalone:
external: false
etcd-0:
external: false
grafana-0:
external: false
minio-0:
external: false
prometheus-0:
external: false
name: risingwave-compose
7 changes: 7 additions & 0 deletions integration_tests/mongodb/config-replica.js
Original file line number Diff line number Diff line change
@@ -0,0 +1,7 @@
rsconf = {
_id: "rs0",
members: [{ _id: 0, host: "mongodb:27017", priority: 1.0 }],
};
rs.initiate(rsconf);
rs.status();

39 changes: 39 additions & 0 deletions integration_tests/mongodb/docker-compose.yaml
Original file line number Diff line number Diff line change
@@ -0,0 +1,39 @@
version: "3"
services:
mongodb:
image: mongo:4.4
container_name: mongodb
ports:
- "27017:27017"
command: --replSet rs0 --oplogSize 128
restart: always
healthcheck:
test: "echo 'db.runCommand({ping: 1})' | mongo"
interval: 5s
timeout: 10s
retries: 3
mongodb-setup:
image: mongo:4.4
container_name: mongodb-setup
depends_on:
- mongodb
entrypoint:
[
"bash",
"-c",
"sleep 10 && mongo --host mongodb:27017 /config-replica.js && sleep 10"
]
restart: "no"
volumes:
- ./config-replica.js:/config-replica.js
random_data_generator:
build:
context: .
dockerfile: Dockerfile.generator
container_name: random_data_generator
depends_on:
- mongodb
environment:
MONGO_HOST: mongodb
MONGO_PORT: 27017
MONGO_DB_NAME: random_data
2 changes: 2 additions & 0 deletions integration_tests/mongodb/requirements.txt
Original file line number Diff line number Diff line change
@@ -0,0 +1,2 @@
pymongo
Faker
Original file line number Diff line number Diff line change
@@ -0,0 +1,41 @@
// Copyright 2024 RisingWave Labs
//
// Licensed under the Apache License, Version 2.0 (the "License");
// you may not use this file except in compliance with the License.
// You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing, software
// distributed under the License is distributed on an "AS IS" BASIS,
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
// See the License for the specific language governing permissions and
// limitations under the License.

package com.risingwave.connector.source.common;

public class CdcConnectorException extends RuntimeException {
private static final long serialVersionUID = 1L;

public CdcConnectorException() {}

public CdcConnectorException(String message) {
super(message);
}

public CdcConnectorException(String message, Throwable cause) {
super(message, cause);
}

public CdcConnectorException(Throwable cause) {
super(cause);
}

public CdcConnectorException(
String message,
Throwable cause,
boolean enableSuppression,
boolean writableStackTrace) {
super(message, cause, enableSuppression, writableStackTrace);
}
}
Loading