Skip to content

Commit 910b047

Browse files
test(mongodb-cdc): add integration and e2e test (#15003) (#15845)
Co-authored-by: StrikeW <[email protected]>
1 parent a20ea5a commit 910b047

File tree

25 files changed

+574
-104
lines changed

25 files changed

+574
-104
lines changed

ci/docker-compose.yml

Lines changed: 42 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -77,6 +77,9 @@ services:
7777
- db
7878
- message_queue
7979
- schemaregistry
80+
- mongodb
81+
- mongodb-setup
82+
- mongo_data_generator
8083
volumes:
8184
- ..:/risingwave
8285

@@ -274,3 +277,42 @@ services:
274277
interval: 5s
275278
timeout: 5s
276279
retries: 5
280+
281+
mongodb:
282+
image: mongo:4.4
283+
ports:
284+
- "27017"
285+
command: --replSet rs0 --oplogSize 128
286+
restart: always
287+
healthcheck:
288+
test: "echo 'db.runCommand({ping: 1})' | mongo"
289+
interval: 5s
290+
timeout: 10s
291+
retries: 3
292+
293+
mongodb-setup:
294+
image: mongo:4.4
295+
container_name: mongodb-setup
296+
depends_on:
297+
- mongodb
298+
entrypoint:
299+
[
300+
"bash",
301+
"-c",
302+
"sleep 10 && mongo --host mongodb:27017 /config-replica.js && sleep 10"
303+
]
304+
restart: "no"
305+
volumes:
306+
- ./mongodb/config-replica.js:/config-replica.js
307+
308+
mongo_data_generator:
309+
build:
310+
context: .
311+
dockerfile: ./mongodb/Dockerfile.generator
312+
container_name: mongo_data_generator
313+
depends_on:
314+
- mongodb
315+
environment:
316+
MONGO_HOST: mongodb
317+
MONGO_PORT: 27017
318+
MONGO_DB_NAME: random_data

ci/mongodb/Dockerfile.generator

Lines changed: 23 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,23 @@
1+
# Use an official Python runtime as a parent image
2+
FROM python:3.8-slim
3+
4+
# Set the working directory to /app
5+
WORKDIR /app
6+
7+
# Copy the requirements file into the container at /app
8+
COPY ./mongodb/requirements.txt /app
9+
COPY ./mongodb/app.py /app
10+
11+
# Install any needed packages specified in requirements.txt
12+
RUN pip install -r requirements.txt
13+
14+
# Make port 5000 available to the world outside this container
15+
EXPOSE 5000
16+
17+
# Define environment variables
18+
ENV MONGO_HOST mongodb
19+
ENV MONGO_PORT 27017
20+
ENV MONGO_DB_NAME random_data
21+
22+
# Run app.py when the container launches
23+
CMD ["python", "app.py"]

ci/mongodb/app.py

Lines changed: 41 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,41 @@
1+
import os
2+
import pymongo
3+
from faker import Faker
4+
5+
# To check the data through mongosh or mongo, run the following command:
6+
# > mongosh mongodb://admin:[email protected]:27017
7+
# > rs0 [direct: primary] test> use random_data
8+
# > rs0 [direct: primary] random_data> db.users.find()
9+
# > rs0 [direct: primary] random_data> db.users.count()
10+
11+
# Connect to MongoDB
12+
mongo_host = os.environ["MONGO_HOST"]
13+
mongo_port = os.environ["MONGO_PORT"]
14+
mongo_db_name = os.environ["MONGO_DB_NAME"]
15+
16+
url = f"mongodb://{mongo_host}:{mongo_port}"
17+
client = pymongo.MongoClient(url)
18+
db = client[mongo_db_name]
19+
20+
# Generate random data
21+
fake = Faker()
22+
collection = db["users"]
23+
24+
for _ in range(55):
25+
user_data = {
26+
"name": fake.name(),
27+
"address": fake.address(),
28+
"email": fake.email(),
29+
}
30+
collection.insert_one(user_data)
31+
32+
# Count the number of records in the collection
33+
total_records = collection.count_documents({})
34+
35+
# Close the MongoDB connection
36+
client.close()
37+
print(f"Random data generated and inserted into MongoDB: {url}")
38+
39+
# Print insertion summary
40+
print("Insertion summary:")
41+
print(f"Total records in the collection: {total_records}")

ci/scripts/e2e-source-test.sh

Lines changed: 14 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -61,6 +61,20 @@ echo "--- starting risingwave cluster"
6161
RUST_LOG="info,risingwave_stream=info,risingwave_batch=info,risingwave_storage=info" \
6262
cargo make ci-start ci-1cn-1fe-with-recovery
6363

64+
echo "--- mongodb cdc test"
65+
# install the mongo shell
66+
wget http://archive.ubuntu.com/ubuntu/pool/main/o/openssl/libssl1.1_1.1.1f-1ubuntu2_amd64.deb
67+
wget https://repo.mongodb.org/apt/ubuntu/dists/focal/mongodb-org/4.4/multiverse/binary-amd64/mongodb-org-shell_4.4.28_amd64.deb
68+
dpkg -i libssl1.1_1.1.1f-1ubuntu2_amd64.deb
69+
dpkg -i mongodb-org-shell_4.4.28_amd64.deb
70+
71+
echo '> ping mongodb'
72+
echo 'db.runCommand({ping: 1})' | mongo mongodb://mongodb:27017
73+
echo '> rs config'
74+
echo 'rs.conf()' | mongo mongodb://mongodb:27017
75+
echo '> run test..'
76+
sqllogictest -p 4566 -d dev './e2e_test/source/cdc/mongodb/**/*.slt'
77+
6478
echo "--- inline cdc test"
6579
export MYSQL_HOST=mysql MYSQL_TCP_PORT=3306 MYSQL_PWD=123456
6680
sqllogictest -p 4566 -d dev './e2e_test/source/cdc_inline/**/*.slt'

ci/scripts/gen-integration-test-yaml.py

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -12,6 +12,7 @@
1212
'schema-registry': ['json'],
1313
'mysql-cdc': ['json'],
1414
'postgres-cdc': ['json'],
15+
'mongodb-cdc': ['json'],
1516
'mysql-sink': ['json'],
1617
'postgres-sink': ['json'],
1718
'iceberg-cdc': ['json'],
Lines changed: 28 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,28 @@
1+
# CDC source basic test
2+
control substitution on
3+
4+
statement ok
5+
CREATE TABLE users (_id JSONB PRIMARY KEY, payload JSONB) WITH (
6+
connector = 'mongodb-cdc',
7+
mongodb.url = 'mongodb://mongodb:27017/?replicaSet=rs0',
8+
collection.name = 'random_data.*'
9+
);
10+
11+
statement ok
12+
CREATE MATERIALIZED VIEW normalized_users AS
13+
SELECT
14+
payload ->> 'name' as name,
15+
payload ->> 'email' as email,
16+
payload ->> 'address' as address
17+
FROM
18+
users;
19+
20+
sleep 5s
21+
22+
query I
23+
select count(*) from normalized_users;
24+
----
25+
55
26+
27+
statement ok
28+
DROP TABLE users cascade

integration_tests/debezium-mongo/docker-compose.yaml

Lines changed: 7 additions & 34 deletions
Original file line numberDiff line numberDiff line change
@@ -28,32 +28,12 @@ services:
2828
service: message_queue
2929

3030
mongodb:
31-
image: mongo:4.4
32-
container_name: mongodb
33-
ports:
34-
- "27017:27017"
35-
command: --replSet rs0 --oplogSize 128
36-
restart: always
37-
healthcheck:
38-
test: "echo 'db.runCommand({ping: 1})' | mongo"
39-
interval: 5s
40-
timeout: 10s
41-
retries: 3
31+
extends: ../mongodb/docker-compose.yaml
32+
service: mongodb
4233

4334
mongodb-setup:
44-
image: mongo:4.4
45-
container_name: mongodb-setup
46-
depends_on:
47-
- mongodb
48-
entrypoint:
49-
[
50-
"bash",
51-
"-c",
52-
"sleep 10 && mongo --host mongodb:27017 /config-replica.js && sleep 10"
53-
]
54-
restart: "no"
55-
volumes:
56-
- ./config-replica.js:/config-replica.js
35+
extends: ../mongodb/docker-compose.yaml
36+
service: mongodb-setup
5737

5838
debezium:
5939
image: debezium/connect:1.9
@@ -78,16 +58,9 @@ services:
7858
CONNECT_VALUE_CONVERTER_SCHEMA_REGISTRY_URL: http://message_queue:8081
7959

8060
random_data_generator:
81-
build:
82-
context: .
83-
dockerfile: Dockerfile.generator
84-
container_name: random_data_generator
85-
depends_on:
86-
- mongodb
87-
environment:
88-
MONGO_HOST: mongodb
89-
MONGO_PORT: 27017
90-
MONGO_DB_NAME: random_data
61+
extends: ../mongodb/docker-compose.yaml
62+
service: random_data_generator
63+
9164

9265
register-mongodb-connector:
9366
image: curlimages/curl:7.79.1
Lines changed: 7 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,7 @@
1+
CREATE MATERIALIZED VIEW normalized_users AS
2+
SELECT
3+
payload ->> 'name' as name,
4+
payload ->> 'email' as email,
5+
payload ->> 'address' as address
6+
FROM
7+
users;
Lines changed: 5 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,5 @@
1+
CREATE TABLE users (_id JSONB PRIMARY KEY, payload JSONB) WITH (
2+
connector = 'mongodb-cdc',
3+
mongodb.url = 'mongodb://mongodb:27017/?replicaSet=rs0',
4+
collection.name = 'random_data.*'
5+
);
Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1 @@
1+
users,normalized_users
Lines changed: 48 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,48 @@
1+
---
2+
version: "3"
3+
services:
4+
risingwave-standalone:
5+
extends:
6+
file: ../../docker/docker-compose.yml
7+
service: risingwave-standalone
8+
etcd-0:
9+
extends:
10+
file: ../../docker/docker-compose.yml
11+
service: etcd-0
12+
grafana-0:
13+
extends:
14+
file: ../../docker/docker-compose.yml
15+
service: grafana-0
16+
minio-0:
17+
extends:
18+
file: ../../docker/docker-compose.yml
19+
service: minio-0
20+
prometheus-0:
21+
extends:
22+
file: ../../docker/docker-compose.yml
23+
service: prometheus-0
24+
mongodb:
25+
extends:
26+
file: ../mongodb/docker-compose.yaml
27+
service: mongodb
28+
mongodb-setup:
29+
extends:
30+
file: ../mongodb/docker-compose.yaml
31+
service: mongodb-setup
32+
random_data_generator:
33+
extends:
34+
file: ../mongodb/docker-compose.yaml
35+
service: random_data_generator
36+
37+
volumes:
38+
risingwave-standalone:
39+
external: false
40+
etcd-0:
41+
external: false
42+
grafana-0:
43+
external: false
44+
minio-0:
45+
external: false
46+
prometheus-0:
47+
external: false
48+
name: risingwave-compose
Lines changed: 7 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,7 @@
1+
rsconf = {
2+
_id: "rs0",
3+
members: [{ _id: 0, host: "mongodb:27017", priority: 1.0 }],
4+
};
5+
rs.initiate(rsconf);
6+
rs.status();
7+
Lines changed: 39 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,39 @@
1+
version: "3"
2+
services:
3+
mongodb:
4+
image: mongo:4.4
5+
container_name: mongodb
6+
ports:
7+
- "27017:27017"
8+
command: --replSet rs0 --oplogSize 128
9+
restart: always
10+
healthcheck:
11+
test: "echo 'db.runCommand({ping: 1})' | mongo"
12+
interval: 5s
13+
timeout: 10s
14+
retries: 3
15+
mongodb-setup:
16+
image: mongo:4.4
17+
container_name: mongodb-setup
18+
depends_on:
19+
- mongodb
20+
entrypoint:
21+
[
22+
"bash",
23+
"-c",
24+
"sleep 10 && mongo --host mongodb:27017 /config-replica.js && sleep 10"
25+
]
26+
restart: "no"
27+
volumes:
28+
- ./config-replica.js:/config-replica.js
29+
random_data_generator:
30+
build:
31+
context: .
32+
dockerfile: Dockerfile.generator
33+
container_name: random_data_generator
34+
depends_on:
35+
- mongodb
36+
environment:
37+
MONGO_HOST: mongodb
38+
MONGO_PORT: 27017
39+
MONGO_DB_NAME: random_data
Lines changed: 2 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,2 @@
1+
pymongo
2+
Faker
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,41 @@
1+
// Copyright 2024 RisingWave Labs
2+
//
3+
// Licensed under the Apache License, Version 2.0 (the "License");
4+
// you may not use this file except in compliance with the License.
5+
// You may obtain a copy of the License at
6+
//
7+
// http://www.apache.org/licenses/LICENSE-2.0
8+
//
9+
// Unless required by applicable law or agreed to in writing, software
10+
// distributed under the License is distributed on an "AS IS" BASIS,
11+
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
12+
// See the License for the specific language governing permissions and
13+
// limitations under the License.
14+
15+
package com.risingwave.connector.source.common;
16+
17+
public class CdcConnectorException extends RuntimeException {
18+
private static final long serialVersionUID = 1L;
19+
20+
public CdcConnectorException() {}
21+
22+
public CdcConnectorException(String message) {
23+
super(message);
24+
}
25+
26+
public CdcConnectorException(String message, Throwable cause) {
27+
super(message, cause);
28+
}
29+
30+
public CdcConnectorException(Throwable cause) {
31+
super(cause);
32+
}
33+
34+
public CdcConnectorException(
35+
String message,
36+
Throwable cause,
37+
boolean enableSuppression,
38+
boolean writableStackTrace) {
39+
super(message, cause, enableSuppression, writableStackTrace);
40+
}
41+
}

0 commit comments

Comments
 (0)