Skip to content

Commit 99501d1

Browse files
yufansongwenym1
andauthored
feat(connector-node): support stream chunk payload in connector node (risingwavelabs#8548)
Co-authored-by: William Wen <[email protected]>
1 parent 8c95702 commit 99501d1

File tree

17 files changed

+428
-21
lines changed

17 files changed

+428
-21
lines changed

ci/scripts/e2e-iceberg-sink-test.sh

Lines changed: 5 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -25,8 +25,13 @@ echo "--- Download artifacts"
2525
mkdir -p target/debug
2626
buildkite-agent artifact download risingwave-"$profile" target/debug/
2727
buildkite-agent artifact download risedev-dev-"$profile" target/debug/
28+
buildkite-agent artifact download librisingwave_java_binding.so-"$profile" target/debug
2829
mv target/debug/risingwave-"$profile" target/debug/risingwave
2930
mv target/debug/risedev-dev-"$profile" target/debug/risedev-dev
31+
mv target/debug/librisingwave_java_binding.so-"$profile" target/debug/librisingwave_java_binding.so
32+
33+
export RW_JAVA_BINDING_LIB_PATH=${PWD}/target/debug
34+
export RW_CONNECTOR_RPC_SINK_PAYLOAD_FORMAT=stream_chunk
3035

3136
echo "--- Download connector node package"
3237
buildkite-agent artifact download risingwave-connector.tar.gz ./

ci/scripts/e2e-sink-test.sh

Lines changed: 5 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -25,8 +25,13 @@ echo "--- Download artifacts"
2525
mkdir -p target/debug
2626
buildkite-agent artifact download risingwave-"$profile" target/debug/
2727
buildkite-agent artifact download risedev-dev-"$profile" target/debug/
28+
buildkite-agent artifact download librisingwave_java_binding.so-"$profile" target/debug
2829
mv target/debug/risingwave-"$profile" target/debug/risingwave
2930
mv target/debug/risedev-dev-"$profile" target/debug/risedev-dev
31+
mv target/debug/librisingwave_java_binding.so-"$profile" target/debug/librisingwave_java_binding.so
32+
33+
export RW_JAVA_BINDING_LIB_PATH=${PWD}/target/debug
34+
export RW_CONNECTOR_RPC_SINK_PAYLOAD_FORMAT=stream_chunk
3035

3136
echo "--- Download connector node package"
3237
buildkite-agent artifact download risingwave-connector.tar.gz ./

ci/scripts/e2e-source-test.sh

Lines changed: 4 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -28,8 +28,12 @@ echo "--- Download artifacts"
2828
mkdir -p target/debug
2929
buildkite-agent artifact download risingwave-"$profile" target/debug/
3030
buildkite-agent artifact download risedev-dev-"$profile" target/debug/
31+
buildkite-agent artifact download librisingwave_java_binding.so-"$profile" target/debug
3132
mv target/debug/risingwave-"$profile" target/debug/risingwave
3233
mv target/debug/risedev-dev-"$profile" target/debug/risedev-dev
34+
mv target/debug/librisingwave_java_binding.so-"$profile" target/debug/librisingwave_java_binding.so
35+
36+
export RW_JAVA_BINDING_LIB_PATH=${PWD}/target/debug
3337

3438

3539
echo "--- Download connector node package"

dashboard/proto/gen/connector_service.ts

Lines changed: 103 additions & 1 deletion
Some generated files are not rendered by default. Learn more about customizing how changed files appear on GitHub.

dashboard/proto/gen/stream_plan.ts

Lines changed: 7 additions & 0 deletions
Some generated files are not rendered by default. Learn more about customizing how changed files appear on GitHub.

docker/Dockerfile

Lines changed: 6 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -34,8 +34,9 @@ RUN rustup self update \
3434

3535
RUN cargo fetch
3636

37-
RUN cargo build -p risingwave_cmd_all --release --features "static-link static-log-level" && \
37+
RUN cargo build -p risingwave_cmd_all -p risingwave_java_binding --release --features "static-link static-log-level" && \
3838
mkdir -p /risingwave/bin && mv /risingwave/target/release/risingwave /risingwave/bin/ && \
39+
mkdir -p /risingwave/lib && mv /risingwave/target/release/librisingwave_java_binding.so /risingwave/lib && \
3940
cargo clean
4041

4142
RUN cd /risingwave/java && mvn -B package -Dmaven.test.skip=true && \
@@ -47,10 +48,13 @@ RUN apt-get update && DEBIAN_FRONTEND=noninteractive apt-get -y install ca-certi
4748

4849
FROM image-base as risingwave
4950
LABEL org.opencontainers.image.source https://github.com/risingwavelabs/risingwave
50-
RUN mkdir -p /risingwave/bin/connector-node
51+
RUN mkdir -p /risingwave/bin/connector-node && mkdir -p /risingwave/lib
5152
COPY --from=builder /risingwave/bin/risingwave /risingwave/bin/risingwave
5253
COPY --from=builder /risingwave/bin/connector-node /risingwave/bin/connector-node
5354
COPY --from=builder /risingwave/ui /risingwave/ui
55+
COPY --from=builder /risingwave/lib/librisingwave_java_binding.so /risingwave/lib/librisingwave_java_binding.so
56+
# Set java.library.path env to /risingwave/lib
57+
ENV RW_JAVA_BINDING_LIB_PATH /risingwave/lib
5458
# Set default playground mode to docker-playground profile
5559
ENV PLAYGROUND_PROFILE docker-playground
5660
# Set default dashboard UI to local path instead of github proxy

java/connector-node/assembly/scripts/start-service.sh

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -24,4 +24,4 @@ if [ -z "${port}" ]; then
2424
port=$PORT
2525
fi
2626

27-
java -classpath "${DIR}/libs/*" $MAIN --port ${port}
27+
java -classpath "${DIR}/libs/*" -Djava.library.path="${RW_JAVA_BINDING_LIB_PATH}" $MAIN --port ${port}

java/connector-node/risingwave-connector-service/pom.xml

Lines changed: 4 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -25,6 +25,10 @@
2525
<groupId>com.risingwave.java</groupId>
2626
<artifactId>proto</artifactId>
2727
</dependency>
28+
<dependency>
29+
<groupId>com.risingwave.java</groupId>
30+
<artifactId>java-binding</artifactId>
31+
</dependency>
2832
<dependency>
2933
<groupId>com.risingwave.java</groupId>
3034
<artifactId>connector-api</artifactId>

java/connector-node/risingwave-connector-service/src/main/java/com/risingwave/connector/SinkStreamObserver.java

Lines changed: 4 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -18,6 +18,7 @@
1818

1919
import com.risingwave.connector.api.TableSchema;
2020
import com.risingwave.connector.api.sink.*;
21+
import com.risingwave.connector.deserializer.StreamChunkDeserializer;
2122
import com.risingwave.metrics.ConnectorNodeMetrics;
2223
import com.risingwave.metrics.MonitoredRowIterator;
2324
import com.risingwave.proto.ConnectorServiceProto;
@@ -202,6 +203,9 @@ private void bindSink(SinkConfig sinkConfig, ConnectorServiceProto.SinkPayloadFo
202203
case JSON:
203204
deserializer = new JsonDeserializer(tableSchema);
204205
break;
206+
case STREAM_CHUNK:
207+
deserializer = new StreamChunkDeserializer(tableSchema);
208+
break;
205209
}
206210
ConnectorNodeMetrics.incActiveConnections(sinkConfig.getConnectorType(), "node1");
207211
}

0 commit comments

Comments
 (0)