Skip to content

Commit 4b008ac

Browse files
WillyKiddStrikeW
andauthored
test(connector): add test cases for debezium json test (risingwavelabs#8334)
Co-authored-by: StrikeW <[email protected]>
1 parent 79b499c commit 4b008ac

File tree

13 files changed

+1058
-71
lines changed

13 files changed

+1058
-71
lines changed

java/connector-node/risingwave-source-cdc/src/main/java/com/risingwave/connector/cdc/debezium/converters/DatetimeTypeConverter.java

Lines changed: 2 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -26,7 +26,6 @@
2626
public class DatetimeTypeConverter implements CustomConverter<SchemaBuilder, RelationalColumn> {
2727

2828
private DateTimeFormatter dateFormatter = DateTimeFormatter.ISO_DATE;
29-
private static final String EPOCH_DAY = "1970-01-01";
3029

3130
@Override
3231
public void configure(Properties props) {
@@ -40,7 +39,7 @@ public void converterFor(
4039
SchemaBuilder schemaBuilder = null;
4140
Converter converter = null;
4241
if ("DATE".equals(sqlType)) {
43-
schemaBuilder = SchemaBuilder.string().name("risingwave.cdc.date.string");
42+
schemaBuilder = SchemaBuilder.string().name("rw.cdc.date.string");
4443
converter = this::convertDate;
4544
}
4645
if (schemaBuilder != null) {
@@ -50,7 +49,7 @@ public void converterFor(
5049

5150
private String convertDate(Object input) {
5251
if (input == null) {
53-
return EPOCH_DAY;
52+
return null;
5453
}
5554
var epochDay = Date.toEpochDay(input, null);
5655
LocalDate date = LocalDate.ofEpochDay(epochDay);
Lines changed: 79 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,79 @@
1+
<?xml version="1.0" encoding="UTF-8"?>
2+
<project xmlns="http://maven.apache.org/POM/4.0.0" xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"
3+
xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 http://maven.apache.org/xsd/maven-4.0.0.xsd">
4+
<parent>
5+
<artifactId>java-parent</artifactId>
6+
<groupId>com.risingwave.java</groupId>
7+
<version>1.0-SNAPSHOT</version>
8+
<relativePath>../../pom.xml</relativePath>
9+
</parent>
10+
<modelVersion>4.0.0</modelVersion>
11+
<name>risingwave-source-test</name>
12+
<packaging>jar</packaging>
13+
<artifactId>risingwave-source-test</artifactId>
14+
15+
<properties>
16+
<testcontainers.version>1.17.6</testcontainers.version>
17+
</properties>
18+
19+
<dependencies>
20+
<dependency>
21+
<groupId>junit</groupId>
22+
<artifactId>junit</artifactId>
23+
<scope>test</scope>
24+
</dependency>
25+
<dependency>
26+
<groupId>org.assertj</groupId>
27+
<artifactId>assertj-core</artifactId>
28+
<version>3.24.2</version>
29+
<scope>test</scope>
30+
</dependency>
31+
<dependency>
32+
<groupId>com.zaxxer</groupId>
33+
<artifactId>HikariCP</artifactId>
34+
<version>5.0.1</version>
35+
<scope>test</scope>
36+
</dependency>
37+
<dependency>
38+
<groupId>org.testcontainers</groupId>
39+
<artifactId>testcontainers</artifactId>
40+
<version>${testcontainers.version}</version>
41+
<scope>test</scope>
42+
</dependency>
43+
<dependency>
44+
<groupId>org.testcontainers</groupId>
45+
<artifactId>mysql</artifactId>
46+
<version>${testcontainers.version}</version>
47+
<scope>test</scope>
48+
</dependency>
49+
<dependency>
50+
<groupId>org.testcontainers</groupId>
51+
<artifactId>postgresql</artifactId>
52+
<version>${testcontainers.version}</version>
53+
<scope>test</scope>
54+
</dependency>
55+
<dependency>
56+
<groupId>com.fasterxml.jackson.core</groupId>
57+
<artifactId>jackson-databind</artifactId>
58+
<version>${jackson.version}</version>
59+
<scope>test</scope>
60+
</dependency>
61+
<dependency>
62+
<groupId>com.fasterxml.jackson.core</groupId>
63+
<artifactId>jackson-core</artifactId>
64+
<version>${jackson.version}</version>
65+
<scope>test</scope>
66+
</dependency>
67+
68+
<dependency>
69+
<groupId>com.risingwave.java</groupId>
70+
<artifactId>risingwave-source-cdc</artifactId>
71+
<scope>test</scope>
72+
</dependency>
73+
<dependency>
74+
<groupId>com.risingwave.java</groupId>
75+
<artifactId>risingwave-connector-service</artifactId>
76+
<scope>test</scope>
77+
</dependency>
78+
</dependencies>
79+
</project>
Lines changed: 205 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,205 @@
1+
// Copyright 2023 RisingWave Labs
2+
//
3+
// Licensed under the Apache License, Version 2.0 (the "License");
4+
// you may not use this file except in compliance with the License.
5+
// You may obtain a copy of the License at
6+
//
7+
// http://www.apache.org/licenses/LICENSE-2.0
8+
//
9+
// Unless required by applicable law or agreed to in writing, software
10+
// distributed under the License is distributed on an "AS IS" BASIS,
11+
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
12+
// See the License for the specific language governing permissions and
13+
// limitations under the License.
14+
15+
package com.risingwave.connector;
16+
17+
import static org.assertj.core.api.Assertions.*;
18+
import static org.junit.Assert.assertEquals;
19+
20+
import com.risingwave.proto.ConnectorServiceProto.*;
21+
import io.grpc.*;
22+
import java.io.IOException;
23+
import java.sql.Connection;
24+
import java.sql.ResultSet;
25+
import java.sql.SQLException;
26+
import java.util.Iterator;
27+
import java.util.List;
28+
import java.util.concurrent.*;
29+
import javax.sql.DataSource;
30+
import org.junit.AfterClass;
31+
import org.junit.BeforeClass;
32+
import org.junit.Ignore;
33+
import org.junit.Test;
34+
import org.slf4j.Logger;
35+
import org.slf4j.LoggerFactory;
36+
import org.testcontainers.containers.MySQLContainer;
37+
import org.testcontainers.utility.MountableFile;
38+
39+
public class MySQLSourceTest {
40+
41+
static final Logger LOG = LoggerFactory.getLogger(MySQLSourceTest.class.getName());
42+
43+
private static final MySQLContainer<?> mysql =
44+
new MySQLContainer<>("mysql:8.0")
45+
.withDatabaseName("test")
46+
.withUsername("root")
47+
.withCopyFileToContainer(
48+
MountableFile.forClasspathResource("my.cnf"), "/etc/my.cnf");
49+
50+
public static Server connectorServer =
51+
ServerBuilder.forPort(ConnectorService.DEFAULT_PORT)
52+
.addService(new ConnectorServiceImpl())
53+
.build();
54+
55+
public static SourceTestClient testClient =
56+
new SourceTestClient(
57+
Grpc.newChannelBuilder(
58+
"localhost:" + ConnectorService.DEFAULT_PORT,
59+
InsecureChannelCredentials.create())
60+
.build());
61+
62+
private static DataSource mysqlDataSource;
63+
64+
@BeforeClass
65+
public static void init() {
66+
// generate orders.tbl test data
67+
SourceTestClient.genOrdersTable(10000);
68+
// start connector server and mysql...
69+
try {
70+
connectorServer.start();
71+
LOG.info("connector service started");
72+
mysql.withCopyFileToContainer(
73+
MountableFile.forClasspathResource("orders.tbl"), "/home/orders.tbl");
74+
mysql.start();
75+
mysqlDataSource = SourceTestClient.getDataSource(mysql);
76+
LOG.info("mysql started");
77+
} catch (IOException e) {
78+
fail("IO exception: ", e);
79+
}
80+
// check mysql configuration...
81+
try {
82+
Connection connection = SourceTestClient.connect(mysqlDataSource);
83+
ResultSet resultSet =
84+
SourceTestClient.performQuery(
85+
connection, testClient.sqlStmts.getProperty("mysql.bin_log"));
86+
assertThat(resultSet.getString("Value")).isEqualTo("ON").as("MySQL: bin_log ON");
87+
connection.close();
88+
} catch (SQLException e) {
89+
fail("SQL exception: ", e);
90+
}
91+
}
92+
93+
@AfterClass
94+
public static void cleanup() {
95+
connectorServer.shutdown();
96+
mysql.stop();
97+
}
98+
99+
// create a TPC-H orders table in mysql
100+
// insert 10,000 rows into orders
101+
// check if the number of changes debezium captures is 10,000
102+
@Test
103+
public void testLines() throws InterruptedException, SQLException {
104+
ExecutorService executorService = Executors.newFixedThreadPool(1);
105+
Connection connection = SourceTestClient.connect(mysqlDataSource);
106+
String query = testClient.sqlStmts.getProperty("tpch.create.orders");
107+
SourceTestClient.performQuery(connection, query);
108+
query =
109+
"LOAD DATA INFILE '/home/orders.tbl' "
110+
+ "INTO TABLE orders "
111+
+ "CHARACTER SET UTF8 "
112+
+ "FIELDS TERMINATED BY '|' LINES TERMINATED BY '\n';";
113+
SourceTestClient.performQuery(connection, query);
114+
Iterator<GetEventStreamResponse> eventStream =
115+
testClient.getEventStreamStart(mysql, SourceType.MYSQL, "test", "orders");
116+
Callable<Integer> countTask =
117+
() -> {
118+
int count = 0;
119+
while (eventStream.hasNext()) {
120+
List<CdcMessage> messages = eventStream.next().getEventsList();
121+
for (CdcMessage ignored : messages) {
122+
count++;
123+
}
124+
if (count == 10000) {
125+
return count;
126+
}
127+
}
128+
return count;
129+
};
130+
Future<Integer> countResult = executorService.submit(countTask);
131+
try {
132+
int count = countResult.get();
133+
LOG.info("number of cdc messages received: {}", count);
134+
assertEquals(count, 10000);
135+
} catch (ExecutionException e) {
136+
fail("Execution exception: ", e);
137+
}
138+
connection.close();
139+
}
140+
141+
// generates test cases for the risingwave debezium parser
142+
@Ignore
143+
@Test
144+
public void getTestJson() throws InterruptedException, SQLException {
145+
Connection connection = SourceTestClient.connect(mysqlDataSource);
146+
String query =
147+
"CREATE TABLE IF NOT EXISTS orders ("
148+
+ "O_KEY BIGINT NOT NULL, "
149+
+ "O_BOOL BOOLEAN, "
150+
+ "O_TINY TINYINT, "
151+
+ "O_INT INT, "
152+
+ "O_REAL REAL, "
153+
+ "O_DOUBLE DOUBLE, "
154+
+ "O_DECIMAL DECIMAL(15, 2), "
155+
+ "O_CHAR CHAR(15), "
156+
+ "O_DATE DATE, "
157+
+ "O_TIME TIME, "
158+
+ "O_DATETIME DATETIME, "
159+
+ "O_TIMESTAMP TIMESTAMP, "
160+
+ "O_JSON JSON, "
161+
+ "PRIMARY KEY (O_KEY))";
162+
SourceTestClient.performQuery(connection, query);
163+
Iterator<GetEventStreamResponse> eventStream =
164+
testClient.getEventStreamStart(mysql, SourceType.MYSQL, "test", "orders");
165+
Thread t1 =
166+
new Thread(
167+
() -> {
168+
while (eventStream.hasNext()) {
169+
List<CdcMessage> messages = eventStream.next().getEventsList();
170+
for (CdcMessage msg : messages) {
171+
LOG.info("{}", msg.getPayload());
172+
}
173+
}
174+
});
175+
Thread.sleep(3000);
176+
t1.start();
177+
Thread.sleep(3000);
178+
// Q1: ordinary insert
179+
query =
180+
"INSERT INTO orders (O_KEY, O_BOOL, O_TINY, O_INT, O_REAL, O_DOUBLE, O_DECIMAL, O_CHAR, O_DATE, O_TIME, O_DATETIME, O_TIMESTAMP, O_JSON)"
181+
+ "VALUES(111, TRUE, -1, -1111, -11.11, -111.11111, -111.11, 'yes please', '1000-01-01', '00:00:00', '1970-01-01 00:00:00', '1970-01-01 00:00:01.000000', '{\"k1\": \"v1\", \"k2\": 11}')";
182+
SourceTestClient.performQuery(connection, query);
183+
// Q2: update value of Q1 (value -> new value)
184+
query =
185+
"UPDATE orders SET O_BOOL = FALSE, "
186+
+ "O_TINY = 3, "
187+
+ "O_INT = 3333, "
188+
+ "O_REAL = 33.33, "
189+
+ "O_DOUBLE = 333.33333, "
190+
+ "O_DECIMAL = 333.33, "
191+
+ "O_CHAR = 'no thanks', "
192+
+ "O_DATE = '9999-12-31', "
193+
+ "O_TIME = '23:59:59', "
194+
+ "O_DATETIME = '5138-11-16 09:46:39', "
195+
+ "O_TIMESTAMP = '2038-01-09 03:14:07', "
196+
+ "O_JSON = '{\"k1\": \"v1_updated\", \"k2\": 33}' "
197+
+ "WHERE orders.O_KEY = 111";
198+
SourceTestClient.performQuery(connection, query);
199+
// Q3: delete value from Q1
200+
query = "DELETE FROM orders WHERE orders.O_KEY = 111";
201+
SourceTestClient.performQuery(connection, query);
202+
Thread.sleep(5000);
203+
connection.close();
204+
}
205+
}

0 commit comments

Comments
 (0)