Skip to content

Commit bf95473

Browse files
Eric FutabVersion
Eric Fu
andauthored
fix(connector): support more data types in JDBC sink (risingwavelabs#8678)
Signed-off-by: tabVersion <[email protected]> Co-authored-by: tabVersion <[email protected]>
1 parent 694c446 commit bf95473

File tree

9 files changed

+90
-25
lines changed

9 files changed

+90
-25
lines changed

ci/scripts/e2e-sink-test.sh

Lines changed: 7 additions & 10 deletions
Original file line numberDiff line numberDiff line change
@@ -31,7 +31,8 @@ mv target/debug/risedev-dev-"$profile" target/debug/risedev-dev
3131
mv target/debug/librisingwave_java_binding.so-"$profile" target/debug/librisingwave_java_binding.so
3232

3333
export RW_JAVA_BINDING_LIB_PATH=${PWD}/target/debug
34-
export RW_CONNECTOR_RPC_SINK_PAYLOAD_FORMAT=stream_chunk
34+
# TODO: Switch to stream_chunk encoding once it's completed, and then remove json encoding as well as this env var.
35+
export RW_CONNECTOR_RPC_SINK_PAYLOAD_FORMAT=json
3536

3637
echo "--- Download connector node package"
3738
buildkite-agent artifact download risingwave-connector.tar.gz ./
@@ -55,7 +56,7 @@ mysql --host=mysql --port=3306 -u root -p123456 -e "CREATE DATABASE IF NOT EXIST
5556
# grant access to `test` for ci test user
5657
mysql --host=mysql --port=3306 -u root -p123456 -e "GRANT ALL PRIVILEGES ON test.* TO 'mysqluser'@'%';"
5758
# create a table named t_remote
58-
mysql --host=mysql --port=3306 -u root -p123456 -e "CREATE TABLE IF NOT EXISTS test.t_remote (id INT, name VARCHAR(255), PRIMARY KEY (id));"
59+
mysql --host=mysql --port=3306 -u root -p123456 test < ./e2e_test/sink/remote/mysql_create_table.sql
5960

6061
echo "--- preparing postgresql"
6162

@@ -65,7 +66,7 @@ export PGPASSWORD=postgres
6566
psql -h db -U postgres -c "CREATE ROLE test LOGIN SUPERUSER PASSWORD 'connector';"
6667
createdb -h db -U postgres test
6768
psql -h db -U postgres -d test -c "CREATE TABLE t4 (v1 int PRIMARY KEY, v2 int);"
68-
psql -h db -U postgres -d test -c "CREATE TABLE t_remote (id serial PRIMARY KEY, name VARCHAR (50) NOT NULL);"
69+
psql -h db -U postgres -d test < ./e2e_test/sink/remote/pg_create_table.sql
6970

7071
node_port=50051
7172
node_timeout=10
@@ -106,13 +107,9 @@ sqllogictest -h db -p 5432 -d test './e2e_test/sink/remote/jdbc.check.pg.slt'
106107
sleep 1
107108

108109
# check sink destination mysql using shell
109-
if mysql --host=mysql --port=3306 -u root -p123456 -sN -e "SELECT * FROM test.t_remote ORDER BY id;" | awk '{
110-
if ($1 == 1 && $2 == "Alex") c1++;
111-
if ($1 == 3 && $2 == "Carl") c2++;
112-
if ($1 == 4 && $2 == "Doris") c3++;
113-
if ($1 == 5 && $2 == "Eve") c4++;
114-
if ($1 == 6 && $2 == "Frank") c5++; }
115-
END { exit !(c1 == 1 && c2 == 1 && c3 == 1 && c4 == 1 && c5 == 1); }'; then
110+
diff -u ./e2e_test/sink/remote/mysql_expected_result.tsv \
111+
<(mysql --host=mysql --port=3306 -u root -p123456 -s -N -r test -e "SELECT * FROM test.t_remote ORDER BY id")
112+
if [ $? -eq 0 ]; then
116113
echo "mysql sink check passed"
117114
else
118115
echo "The output is not as expected."

e2e_test/sink/remote/jdbc.check.pg.slt

Lines changed: 5 additions & 5 deletions
Original file line numberDiff line numberDiff line change
@@ -3,8 +3,8 @@
33
query I
44
select * from t_remote order by id;
55
----
6-
1 Alex
7-
3 Carl
8-
4 Doris
9-
5 Eve
10-
6 Frank
6+
1 Alex 28208 281620391 4986480304337356800 28162.0391 2.03 28162.0391 2023-03-20 10:18:30
7+
3 Carl 18300 1702307129 7878292368468104192 17023.07129 23.07 17023.07129 2023-03-20 10:18:32
8+
4 Doris 17250 151951802 3946135584462581760 1519518.02 18.02 1519518.02 2023-03-21 10:18:30
9+
5 Eve 9725 698160808 524334216698825600 69.8160808 69.81 69.8160808 2023-03-21 10:18:31
10+
6 Frank 28131 1233587627 8492820454814063616 123358.7627 58.76 123358.7627 2023-03-21 10:18:32

e2e_test/sink/remote/jdbc.load.slt

Lines changed: 20 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -1,5 +1,15 @@
11
statement ok
2-
create table t_remote (id integer primary key, name varchar);
2+
create table t_remote (
3+
id integer primary key,
4+
v_varchar varchar,
5+
v_smallint smallint,
6+
v_integer integer,
7+
v_bigint bigint,
8+
v_decimal decimal,
9+
v_float float,
10+
v_double double,
11+
v_timestamp timestamp
12+
);
313

414
statement ok
515
create materialized view mv_remote as select * from t_remote;
@@ -19,16 +29,22 @@ CREATE SINK s_mysql FROM mv_remote WITH (
1929
);
2030

2131
statement ok
22-
INSERT INTO t_remote VALUES (1, 'Alice'), (2, 'Bob'), (3, 'Carl');
32+
INSERT INTO t_remote VALUES
33+
(1, 'Alice', 28208, 281620391, 4986480304337356659, 28162.0391, 2.03, 28162.0391, '2023-03-20 10:18:30'),
34+
(2, 'Bob', 10580, 2131030003, 3074255027698877876, 21310.30003, 10.3, 21310.30003, '2023-03-20 10:18:31'),
35+
(3, 'Carl', 18300, 1702307129, 7878292368468104216, 17023.07129, 23.07, 17023.07129, '2023-03-20 10:18:32');
2336

2437
statement ok
25-
INSERT INTO t_remote VALUES (4, 'Doris'), (5, 'Eve'), (6, 'Frank');
38+
INSERT INTO t_remote VALUES
39+
(4, 'Doris', 17250, 151951802, 3946135584462581863, 1519518.02, 18.02, 1519518.02, '2023-03-21 10:18:30'),
40+
(5, 'Eve', 9725, 698160808, 524334216698825611, 69.8160808, 69.81, 69.8160808, '2023-03-21 10:18:31'),
41+
(6, 'Frank', 28131, 1233587627, 8492820454814063326, 123358.7627, 58.76, 123358.7627, '2023-03-21 10:18:32');
2642

2743
statement ok
2844
FLUSH;
2945

3046
statement ok
31-
UPDATE t_remote SET name = 'Alex' WHERE id = 1;
47+
UPDATE t_remote SET v_varchar = 'Alex' WHERE id = 1;
3248

3349
statement ok
3450
DELETE FROM t_remote WHERE id = 2;
Lines changed: 11 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,11 @@
1+
CREATE TABLE t_remote (
2+
id integer PRIMARY KEY,
3+
v_varchar varchar(100),
4+
v_smallint smallint,
5+
v_integer integer,
6+
v_bigint bigint,
7+
v_decimal decimal,
8+
v_float float,
9+
v_double double,
10+
v_timestamp timestamp
11+
);
Lines changed: 5 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,5 @@
1+
1 Alex 28208 281620391 4986480304337356800 28162 2.03 28162.0391 2023-03-20 10:18:30
2+
3 Carl 18300 1702307129 7878292368468104192 17023 23.07 17023.07129 2023-03-20 10:18:32
3+
4 Doris 17250 151951802 3946135584462581760 1519518 18.02 1519518.02 2023-03-21 10:18:30
4+
5 Eve 9725 698160808 524334216698825600 70 69.81 69.8160808 2023-03-21 10:18:31
5+
6 Frank 28131 1233587627 8492820454814063616 123359 58.76 123358.7627 2023-03-21 10:18:32
Lines changed: 11 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,11 @@
1+
CREATE TABLE t_remote (
2+
id integer PRIMARY KEY,
3+
v_varchar varchar(100),
4+
v_smallint smallint,
5+
v_integer integer,
6+
v_bigint bigint,
7+
v_decimal decimal,
8+
v_float real,
9+
v_double double precision,
10+
v_timestamp timestamp
11+
);

java/connector-node/risingwave-connector-service/src/main/java/com/risingwave/connector/JsonDeserializer.java

Lines changed: 28 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -22,6 +22,8 @@
2222
import com.risingwave.proto.ConnectorServiceProto;
2323
import com.risingwave.proto.ConnectorServiceProto.SinkStreamRequest.WriteBatch.JsonPayload;
2424
import com.risingwave.proto.Data;
25+
import java.math.BigDecimal;
26+
import java.sql.Timestamp;
2527
import java.util.Map;
2628

2729
public class JsonDeserializer implements Deserializer {
@@ -31,6 +33,8 @@ public JsonDeserializer(TableSchema tableSchema) {
3133
this.tableSchema = tableSchema;
3234
}
3335

36+
// Encoding here should be consistent with `datum_to_json_object()` in
37+
// src/connector/src/sink/mod.rs
3438
@Override
3539
public CloseableIterator<SinkRow> deserialize(
3640
ConnectorServiceProto.SinkStreamRequest.WriteBatch writeBatch) {
@@ -113,6 +117,19 @@ private static Double castDouble(Object value) {
113117
}
114118
}
115119

120+
private static BigDecimal castDecimal(Object value) {
121+
if (value instanceof String) {
122+
// FIXME(eric): See `datum_to_json_object()` in src/connector/src/sink/mod.rs
123+
return new BigDecimal((String) value);
124+
} else if (value instanceof BigDecimal) {
125+
return (BigDecimal) value;
126+
} else {
127+
throw io.grpc.Status.INVALID_ARGUMENT
128+
.withDescription("unable to cast into double from " + value.getClass())
129+
.asRuntimeException();
130+
}
131+
}
132+
116133
private static Object validateJsonDataTypes(Data.DataType.TypeName typeName, Object value) {
117134
switch (typeName) {
118135
case INT16:
@@ -132,13 +149,24 @@ private static Object validateJsonDataTypes(Data.DataType.TypeName typeName, Obj
132149
return castDouble(value);
133150
case FLOAT:
134151
return castDouble(value).floatValue();
152+
case DECIMAL:
153+
return castDecimal(value);
135154
case BOOLEAN:
136155
if (!(value instanceof Boolean)) {
137156
throw io.grpc.Status.INVALID_ARGUMENT
138157
.withDescription("Expected boolean, got " + value.getClass())
139158
.asRuntimeException();
140159
}
141160
return value;
161+
case TIMESTAMP:
162+
case TIMESTAMPTZ:
163+
if (!(value instanceof String)) {
164+
throw io.grpc.Status.INVALID_ARGUMENT
165+
.withDescription(
166+
"Expected timestamp in string, got " + value.getClass())
167+
.asRuntimeException();
168+
}
169+
return Timestamp.valueOf((String) value);
142170
default:
143171
throw io.grpc.Status.INVALID_ARGUMENT
144172
.withDescription("unsupported type " + typeName)

java/connector-node/risingwave-sink-jdbc/src/main/java/com/risingwave/connector/JDBCSinkFactory.java

Lines changed: 3 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -75,7 +75,9 @@ public void validate(
7575
jdbcPk.add(pkResultSet.getString("COLUMN_NAME"));
7676
}
7777
} catch (SQLException e) {
78-
throw Status.INTERNAL.withCause(e).asRuntimeException();
78+
throw Status.INVALID_ARGUMENT
79+
.withDescription("failed to connect to target database: " + e.getSQLState())
80+
.asRuntimeException();
7981
}
8082

8183
if (!jdbcTableNames.contains(tableName)) {

java/tools/maven/checkstyle.xml

Lines changed: 0 additions & 5 deletions
Original file line numberDiff line numberDiff line change
@@ -165,11 +165,6 @@ This file is based on the checkstyle file of Apache Beam.
165165
<!-- Enforce Java-style array declarations -->
166166
<module name="ArrayTypeStyle"/>
167167

168-
<module name="TodoComment">
169-
<!-- Checks that disallowed strings are not used in comments. -->
170-
<property name="format" value="(FIXME)|(XXX)|(@author)"/>
171-
</module>
172-
173168
<!--
174169
175170
IMPORT CHECKS

0 commit comments

Comments
 (0)