Skip to content

Commit a267376

Browse files
committed
add integration test
1 parent bcabf96 commit a267376

File tree

9 files changed

+234
-0
lines changed

9 files changed

+234
-0
lines changed
Lines changed: 25 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,25 @@
1+
# Use an official Python runtime as a parent image
2+
FROM python:3.8-slim
3+
4+
# Set the working directory to /app
5+
WORKDIR /app
6+
7+
# Copy the requirements file into the container at /app
8+
COPY requirements.txt /app
9+
10+
# Install any needed packages specified in requirements.txt
11+
RUN pip install -r requirements.txt
12+
13+
# Copy the rest of the application code
14+
COPY . /app
15+
16+
# Make port 5000 available to the world outside this container
17+
EXPOSE 5000
18+
19+
# Define environment variables
20+
ENV MONGO_HOST mongodb
21+
ENV MONGO_PORT 27017
22+
ENV MONGO_DB_NAME random_data
23+
24+
# Run app.py when the container launches
25+
CMD ["python", "app.py"]

integration_tests/mongodb-cdc/app.py

Lines changed: 41 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,41 @@
1+
import os
2+
import pymongo
3+
from faker import Faker
4+
5+
# To check the data through mongosh or mongo, run the following command:
6+
# > mongosh mongodb://admin:[email protected]:27017
7+
# > rs0 [direct: primary] test> use random_data
8+
# > rs0 [direct: primary] random_data> db.users.find()
9+
# > rs0 [direct: primary] random_data> db.users.count()
10+
11+
# Connect to MongoDB
12+
mongo_host = os.environ["MONGO_HOST"]
13+
mongo_port = os.environ["MONGO_PORT"]
14+
mongo_db_name = os.environ["MONGO_DB_NAME"]
15+
16+
url = f"mongodb://{mongo_host}:{mongo_port}"
17+
client = pymongo.MongoClient(url)
18+
db = client[mongo_db_name]
19+
20+
# Generate random data
21+
fake = Faker()
22+
collection = db["users"]
23+
24+
for _ in range(1000):
25+
user_data = {
26+
"name": fake.name(),
27+
"address": fake.address(),
28+
"email": fake.email(),
29+
}
30+
collection.insert_one(user_data)
31+
32+
# Count the number of records in the collection
33+
total_records = collection.count_documents({})
34+
35+
# Close the MongoDB connection
36+
client.close()
37+
print(f"Random data generated and inserted into MongoDB: {url}")
38+
39+
# Print insertion summary
40+
print("Insertion summary:")
41+
print(f"Total records in the collection: {total_records}")
Lines changed: 7 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,7 @@
1+
rsconf = {
2+
_id: "rs0",
3+
members: [{ _id: 0, host: "mongodb:27017", priority: 1.0 }],
4+
};
5+
rs.initiate(rsconf);
6+
rs.status();
7+
Lines changed: 7 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,7 @@
1+
CREATE MATERIALIZED VIEW normalized_users AS
2+
SELECT
3+
payload ->> 'name' as name,
4+
payload ->> 'email' as email,
5+
payload ->> 'address' as address
6+
FROM
7+
users;
Lines changed: 5 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,5 @@
1+
CREATE TABLE users (_id JSONB PRIMARY KEY, payload JSONB) WITH (
2+
connector = 'mongodb-cdc',
3+
mongodb.url = 'mongodb://mongodb:27017/?replicaSet=rs0',
4+
collection.name = 'random_data.*'
5+
);
Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1 @@
1+
users,normalized_users
Lines changed: 42 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,42 @@
1+
version: '3.1'
2+
3+
services:
4+
mongodb:
5+
image: mongodb/mongodb-community-server:4.4.23-ubi8
6+
container_name: mongodb
7+
ports:
8+
- "27017:27017"
9+
command: --replSet rs0 --oplogSize 128
10+
restart: always
11+
healthcheck:
12+
test: "echo 'db.runCommand({ping: 1})' | mongo"
13+
interval: 5s
14+
timeout: 10s
15+
retries: 3
16+
17+
mongodb-setup:
18+
image: mongodb/mongodb-community-server:4.4.23-ubi8
19+
container_name: mongodb-setup
20+
depends_on:
21+
- mongodb
22+
entrypoint:
23+
[
24+
"bash",
25+
"-c",
26+
"sleep 10 && mongo --host mongodb:27017 /config-replica.js && sleep 10"
27+
]
28+
restart: "no"
29+
volumes:
30+
- ./config-replica.js:/config-replica.js
31+
32+
random_data_generator:
33+
build:
34+
context: .
35+
dockerfile: Dockerfile.generator
36+
container_name: random_data_generator
37+
depends_on:
38+
- mongodb
39+
environment:
40+
MONGO_HOST: mongodb
41+
MONGO_PORT: 27017
42+
MONGO_DB_NAME: random_data
Lines changed: 2 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,2 @@
1+
pymongo
2+
Faker
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,104 @@
1+
// Copyright 2024 RisingWave Labs
2+
//
3+
// Licensed under the Apache License, Version 2.0 (the "License");
4+
// you may not use this file except in compliance with the License.
5+
// You may obtain a copy of the License at
6+
//
7+
// http://www.apache.org/licenses/LICENSE-2.0
8+
//
9+
// Unless required by applicable law or agreed to in writing, software
10+
// distributed under the License is distributed on an "AS IS" BASIS,
11+
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
12+
// See the License for the specific language governing permissions and
13+
// limitations under the License.
14+
15+
package com.risingwave.connector.source;
16+
17+
import com.mongodb.client.MongoClient;
18+
import com.mongodb.client.MongoClients;
19+
import com.mongodb.client.MongoCollection;
20+
import com.mongodb.client.MongoDatabase;
21+
import com.mongodb.client.model.InsertManyOptions;
22+
import com.risingwave.connector.ConnectorServiceImpl;
23+
import com.risingwave.proto.ConnectorServiceProto;
24+
import io.grpc.*;
25+
import java.util.ArrayList;
26+
import java.util.HashMap;
27+
import java.util.Iterator;
28+
import java.util.List;
29+
import java.util.Map;
30+
import java.util.Random;
31+
import java.util.concurrent.Callable;
32+
import java.util.concurrent.Executors;
33+
import org.bson.Document;
34+
import org.bson.types.ObjectId;
35+
import org.junit.*;
36+
import org.slf4j.Logger;
37+
import org.slf4j.LoggerFactory;
38+
import org.testcontainers.containers.GenericContainer;
39+
import org.testcontainers.containers.MongoDBContainer;
40+
41+
import static java.lang.Thread.sleep;
42+
43+
public class MongoDbSourceTest {
44+
private static final Logger LOG = LoggerFactory.getLogger(MongoDbSourceTest.class.getName());
45+
46+
public static Server connectorService =
47+
ServerBuilder.forPort(SourceTestClient.DEFAULT_PORT)
48+
.addService(new ConnectorServiceImpl())
49+
.build();
50+
51+
public static SourceTestClient testClient =
52+
new SourceTestClient(
53+
Grpc.newChannelBuilder(
54+
"localhost:" + SourceTestClient.DEFAULT_PORT,
55+
InsecureChannelCredentials.create())
56+
.build());
57+
58+
@BeforeClass
59+
public static void init() {
60+
try {
61+
connectorService.start();
62+
} catch (Exception e) {
63+
LOG.error("failed to start connector service", e);
64+
Assert.fail();
65+
}
66+
}
67+
68+
@AfterClass
69+
public static void cleanup() {
70+
connectorService.shutdown();
71+
}
72+
73+
// manually test
74+
@Test
75+
@Ignore
76+
public void testSnapshotLoad() throws Exception {
77+
Map<String, String> props = new HashMap<>();
78+
props.put("mongodb.url", "mongodb://localhost:27017/?replicaSet=rs0");
79+
props.put("collection.name", "dev.*");
80+
Iterator<ConnectorServiceProto.GetEventStreamResponse> eventStream =
81+
testClient.getEventStream(ConnectorServiceProto.SourceType.MONGODB, 3001, props);
82+
Callable<Integer> countTask =
83+
() -> {
84+
int count = 0;
85+
while (eventStream.hasNext()) {
86+
List<ConnectorServiceProto.CdcMessage> messages =
87+
eventStream.next().getEventsList();
88+
for (ConnectorServiceProto.CdcMessage msg : messages) {
89+
if (!msg.getPayload().isBlank()) {
90+
count++;
91+
}
92+
}
93+
if (count >= 10) {
94+
return count;
95+
}
96+
}
97+
return count;
98+
};
99+
100+
var pool = Executors.newFixedThreadPool(1);
101+
var result = pool.submit(countTask);
102+
Assert.assertEquals(10, result.get().intValue());
103+
}
104+
}

0 commit comments

Comments
 (0)