Skip to content

Commit d90165a

Browse files
authored
refactor(connector): use config file to initiate a debezium source connector (risingwavelabs#8539)
1 parent bba43ba commit d90165a

20 files changed

+303
-446
lines changed

ci/scripts/e2e-iceberg-sink-test.sh

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -45,6 +45,7 @@ cargo make pre-start-dev
4545
cargo make link-all-in-one-binaries
4646

4747
echo "--- starting risingwave cluster with connector node"
48+
mkdir -p .risingwave/log
4849
./connector-node/start-service.sh -p 50051 > .risingwave/log/connector-sink.log 2>&1 &
4950
cargo make ci-start ci-iceberg-test
5051
sleep 1

ci/scripts/e2e-sink-test.sh

Lines changed: 5 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -64,7 +64,11 @@ psql -h db -U postgres -d test -c "CREATE TABLE t_remote (id serial PRIMARY KEY,
6464

6565
node_port=50051
6666
node_timeout=10
67-
./connector-node/start-service.sh -p $node_port > .risingwave/log/connector-source.log 2>&1 &
67+
68+
echo "--- starting risingwave cluster with connector node"
69+
cargo make ci-start ci-1cn-1fe
70+
./connector-node/start-service.sh -p $node_port > .risingwave/log/connector-node.log 2>&1 &
71+
6872
echo "waiting for connector node to start"
6973
start_time=$(date +%s)
7074
while :
@@ -83,8 +87,6 @@ do
8387
sleep 0.1
8488
done
8589

86-
echo "--- starting risingwave cluster with connector node"
87-
cargo make ci-start ci-1cn-1fe
8890

8991
echo "--- testing sinks"
9092
sqllogictest -p 4566 -d dev './e2e_test/sink/append_only_sink.slt'

ci/scripts/e2e-source-test.sh

Lines changed: 5 additions & 5 deletions
Original file line numberDiff line numberDiff line change
@@ -6,7 +6,7 @@ set -euo pipefail
66
source ci/scripts/common.env.sh
77

88
# prepare environment
9-
export CONNECTOR_RPC_ENDPOINT="localhost:60061"
9+
export CONNECTOR_RPC_ENDPOINT="localhost:50051"
1010

1111
while getopts 'p:' opt; do
1212
case ${opt} in
@@ -65,7 +65,10 @@ psql -h db -U postgres -d cdc_test < ./e2e_test/source/cdc/postgres_cdc.sql
6565

6666
node_port=50051
6767
node_timeout=10
68-
./connector-node/start-service.sh -p $node_port > .risingwave/log/connector-source.log 2>&1 &
68+
69+
echo "--- starting risingwave cluster with connector node"
70+
cargo make ci-start ci-1cn-1fe-with-recovery
71+
./connector-node/start-service.sh -p $node_port > .risingwave/log/connector-node.log 2>&1 &
6972

7073
echo "waiting for connector node to start"
7174
start_time=$(date +%s)
@@ -84,9 +87,6 @@ do
8487
fi
8588
sleep 0.1
8689
done
87-
88-
# start risingwave cluster
89-
cargo make ci-start ci-1cn-1fe-with-recovery
9090
sleep 2
9191

9292
echo "---- mysql & postgres cdc validate test"

java/connector-node/connector-api/src/main/java/com/risingwave/connector/api/source/ConnectorConfig.java

Lines changed: 0 additions & 66 deletions
This file was deleted.

java/connector-node/connector-api/src/main/java/com/risingwave/connector/api/source/SourceHandler.java

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -19,7 +19,7 @@
1919

2020
/** Handler for RPC request */
2121
public interface SourceHandler {
22-
void handle(
22+
void startSource(
2323
ServerCallStreamObserver<ConnectorServiceProto.GetEventStreamResponse>
2424
responseObserver);
2525
}

java/connector-node/risingwave-connector-service/pom.xml

Lines changed: 8 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -47,7 +47,14 @@
4747
<groupId>com.google.code.gson</groupId>
4848
<artifactId>gson</artifactId>
4949
</dependency>
50-
50+
<dependency>
51+
<groupId>org.apache.commons</groupId>
52+
<artifactId>commons-text</artifactId>
53+
</dependency>
54+
<dependency>
55+
<groupId>commons-io</groupId>
56+
<artifactId>commons-io</artifactId>
57+
</dependency>
5158
<dependency>
5259
<groupId>commons-cli</groupId>
5360
<artifactId>commons-cli</artifactId>

java/connector-node/risingwave-connector-service/src/main/java/com/risingwave/sourcenode/SourceRequestHandler.java

Lines changed: 27 additions & 27 deletions
Original file line numberDiff line numberDiff line change
@@ -14,10 +14,10 @@
1414

1515
package com.risingwave.sourcenode;
1616

17-
import com.risingwave.connector.api.source.ConnectorConfig;
1817
import com.risingwave.connector.api.source.SourceTypeE;
1918
import com.risingwave.proto.ConnectorServiceProto;
2019
import com.risingwave.proto.Data.DataType;
20+
import com.risingwave.sourcenode.common.DbzConnectorConfig;
2121
import com.risingwave.sourcenode.core.SourceHandlerFactory;
2222
import io.grpc.Status;
2323
import io.grpc.StatusException;
@@ -46,19 +46,19 @@ public void handle(ConnectorServiceProto.GetEventStreamRequest request) {
4646
break;
4747
case START:
4848
var startRequest = request.getStart();
49-
var handler =
50-
SourceHandlerFactory.createSourceHandler(
51-
SourceTypeE.valueOf(startRequest.getSourceType()),
52-
startRequest.getSourceId(),
53-
startRequest.getStartOffset(),
54-
new ConnectorConfig(startRequest.getPropertiesMap()));
55-
if (handler == null) {
56-
LOG.error("failed to create source handler");
57-
responseObserver.onCompleted();
58-
} else {
59-
handler.handle(
49+
try {
50+
var handler =
51+
SourceHandlerFactory.createSourceHandler(
52+
SourceTypeE.valueOf(startRequest.getSourceType()),
53+
startRequest.getSourceId(),
54+
startRequest.getStartOffset(),
55+
startRequest.getPropertiesMap());
56+
handler.startSource(
6057
(ServerCallStreamObserver<ConnectorServiceProto.GetEventStreamResponse>)
6158
responseObserver);
59+
} catch (Throwable t) {
60+
LOG.error("failed to start source", t);
61+
responseObserver.onError(t);
6262
}
6363
break;
6464
case REQUEST_NOT_SET:
@@ -75,11 +75,11 @@ private void validateDbProperties(
7575
String jdbcUrl =
7676
toJdbcPrefix(validate.getSourceType())
7777
+ "://"
78-
+ props.get(ConnectorConfig.HOST)
78+
+ props.get(DbzConnectorConfig.HOST)
7979
+ ":"
80-
+ props.get(ConnectorConfig.PORT)
80+
+ props.get(DbzConnectorConfig.PORT)
8181
+ "/"
82-
+ props.get(ConnectorConfig.DB_NAME);
82+
+ props.get(DbzConnectorConfig.DB_NAME);
8383
LOG.debug("validate jdbc url: {}", jdbcUrl);
8484

8585
var sqlStmts = new Properties();
@@ -93,8 +93,8 @@ private void validateDbProperties(
9393
try (var conn =
9494
DriverManager.getConnection(
9595
jdbcUrl,
96-
props.get(ConnectorConfig.USER),
97-
props.get(ConnectorConfig.PASSWORD))) {
96+
props.get(DbzConnectorConfig.USER),
97+
props.get(DbzConnectorConfig.PASSWORD))) {
9898
// usernamed and password are correct
9999
var dbMeta = conn.getMetaData();
100100

@@ -138,8 +138,8 @@ private void validateDbProperties(
138138
}
139139
// check whether table exist
140140
try (var stmt = conn.prepareStatement(sqlStmts.getProperty("mysql.table"))) {
141-
stmt.setString(1, props.get(ConnectorConfig.DB_NAME));
142-
stmt.setString(2, props.get(ConnectorConfig.TABLE_NAME));
141+
stmt.setString(1, props.get(DbzConnectorConfig.DB_NAME));
142+
stmt.setString(2, props.get(DbzConnectorConfig.TABLE_NAME));
143143
var res = stmt.executeQuery();
144144
while (res.next()) {
145145
var ret = res.getInt(1);
@@ -152,8 +152,8 @@ private void validateDbProperties(
152152
try (var stmt =
153153
conn.prepareStatement(sqlStmts.getProperty("mysql.table_schema"))) {
154154
var sourceSchema = validate.getTableSchema();
155-
stmt.setString(1, props.get(ConnectorConfig.DB_NAME));
156-
stmt.setString(2, props.get(ConnectorConfig.TABLE_NAME));
155+
stmt.setString(1, props.get(DbzConnectorConfig.DB_NAME));
156+
stmt.setString(2, props.get(DbzConnectorConfig.TABLE_NAME));
157157
var res = stmt.executeQuery();
158158
var pkFields = new HashSet<String>();
159159
int index = 0;
@@ -203,8 +203,8 @@ private void validateDbProperties(
203203
}
204204
// check schema name and table name
205205
try (var stmt = conn.prepareStatement(sqlStmts.getProperty("postgres.table"))) {
206-
stmt.setString(1, props.get(ConnectorConfig.PG_SCHEMA_NAME));
207-
stmt.setString(2, props.get(ConnectorConfig.TABLE_NAME));
206+
stmt.setString(1, props.get(DbzConnectorConfig.PG_SCHEMA_NAME));
207+
stmt.setString(2, props.get(DbzConnectorConfig.TABLE_NAME));
208208
var res = stmt.executeQuery();
209209
while (res.next()) {
210210
var ret = res.getString(1);
@@ -219,9 +219,9 @@ private void validateDbProperties(
219219
try (var stmt = conn.prepareStatement(sqlStmts.getProperty("postgres.pk"))) {
220220
stmt.setString(
221221
1,
222-
props.get(ConnectorConfig.PG_SCHEMA_NAME)
222+
props.get(DbzConnectorConfig.PG_SCHEMA_NAME)
223223
+ "."
224-
+ props.get(ConnectorConfig.TABLE_NAME));
224+
+ props.get(DbzConnectorConfig.TABLE_NAME));
225225

226226
var res = stmt.executeQuery();
227227
var pkFields = new HashSet<String>();
@@ -237,8 +237,8 @@ private void validateDbProperties(
237237
// check whether source schema match table schema on upstream
238238
try (var stmt =
239239
conn.prepareStatement(sqlStmts.getProperty("postgres.table_schema"))) {
240-
stmt.setString(1, props.get(ConnectorConfig.PG_SCHEMA_NAME));
241-
stmt.setString(2, props.get(ConnectorConfig.TABLE_NAME));
240+
stmt.setString(1, props.get(DbzConnectorConfig.PG_SCHEMA_NAME));
241+
stmt.setString(2, props.get(DbzConnectorConfig.TABLE_NAME));
242242
var res = stmt.executeQuery();
243243
var sourceSchema = validate.getTableSchema();
244244
int index = 0;

0 commit comments

Comments
 (0)