Skip to content

Commit 9df111b

Browse files
committed
Merge branch 'main' into lz/auto-cherrypick-pr
2 parents 93ec501 + 75f6025 commit 9df111b

File tree

34 files changed

+892
-30
lines changed

34 files changed

+892
-30
lines changed
Lines changed: 8 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -1,11 +1,13 @@
11
name: PR for release branch
22
on:
3-
push:
3+
pull_request:
44
branches:
55
- main
6+
types: ["closed"]
7+
68
jobs:
79
release_pull_request:
8-
if: "contains(github.event.head_commit.message, 'fix')"
10+
if: "contains(github.event.pull_request.labels.*.name, 'release-v1.0') && github.event.pull_request.merged == true"
911
runs-on: ubuntu-latest
1012
name: release_pull_request
1113
steps:
@@ -15,5 +17,8 @@ jobs:
1517
uses: risingwavelabs/github-action-cherry-pick@master
1618
with:
1719
pr_branch: 'v0.19.0-rc'
20+
labels: |
21+
cherry-pick
22+
body: 'Cherry picking #{old_pull_request_id} onto this branch'
1823
env:
19-
GITHUB_TOKEN: ${{ secrets.GITHUB_TOKEN }}
24+
GITHUB_TOKEN: ${{ secrets.GITHUB_TOKEN }}

Cargo.lock

Lines changed: 2 additions & 0 deletions
Some generated files are not rendered by default. Learn more about customizing how changed files appear on GitHub.

docker/Dockerfile

Lines changed: 18 additions & 9 deletions
Original file line numberDiff line numberDiff line change
@@ -1,10 +1,13 @@
1-
FROM ubuntu:22.04 as base
1+
FROM ubuntu:22.04 AS base
22

33
ENV LANG en_US.utf8
44

5-
RUN apt-get update && DEBIAN_FRONTEND=noninteractive apt-get -y install make build-essential cmake protobuf-compiler curl bash lld maven unzip libsasl2-dev
5+
RUN apt-get update \
6+
&& apt-get -y install ca-certificates build-essential libsasl2-dev openjdk-11-jdk
67

7-
FROM base as builder
8+
FROM base AS builder
9+
10+
RUN apt-get update && apt-get -y install make cmake protobuf-compiler curl bash lld maven unzip
811

912
SHELL ["/bin/bash", "-c"]
1013

@@ -32,28 +35,34 @@ RUN rustup self update \
3235
&& rustup show \
3336
&& rustup component add rustfmt
3437

35-
RUN cargo fetch
36-
37-
RUN cargo build -p risingwave_cmd_all --release --features "rw-static-link" && \
38+
RUN cargo fetch && \
39+
cargo build -p risingwave_cmd_all --release --features "rw-static-link" && \
3840
mkdir -p /risingwave/bin && mv /risingwave/target/release/risingwave /risingwave/bin/ && \
41+
cp ./target/release/build/tikv-jemalloc-sys-*/out/build/bin/jeprof /risingwave/bin/ && \
3942
mkdir -p /risingwave/lib && cargo clean
4043

4144
RUN cd /risingwave/java && mvn -B package -Dmaven.test.skip=true -Djava.binding.release=true && \
4245
mkdir -p /risingwave/bin/connector-node && \
4346
tar -zxvf /risingwave/java/connector-node/assembly/target/risingwave-connector-1.0.0.tar.gz -C /risingwave/bin/connector-node
4447

45-
FROM ubuntu:22.04 as image-base
46-
RUN apt-get update && DEBIAN_FRONTEND=noninteractive apt-get -y install ca-certificates openjdk-11-jdk libsasl2-dev && rm -rf /var/lib/{apt,dpkg,cache,log}/
48+
FROM base AS risingwave
4749

48-
FROM image-base as risingwave
4950
LABEL org.opencontainers.image.source https://github.com/risingwavelabs/risingwave
51+
52+
RUN apt-get -y install gdb \
53+
&& rm -rf /var/lib/{apt,dpkg,cache,log}/
54+
5055
RUN mkdir -p /risingwave/bin/connector-node && mkdir -p /risingwave/lib
56+
5157
COPY --from=builder /risingwave/bin/risingwave /risingwave/bin/risingwave
5258
COPY --from=builder /risingwave/bin/connector-node /risingwave/bin/connector-node
5359
COPY --from=builder /risingwave/ui /risingwave/ui
60+
COPY --from=builder /risingwave/bin/jeprof /usr/local/bin/jeprof
61+
5462
# Set default playground mode to docker-playground profile
5563
ENV PLAYGROUND_PROFILE docker-playground
5664
# Set default dashboard UI to local path instead of github proxy
5765
ENV RW_DASHBOARD_UI_PATH /risingwave/ui
66+
5867
ENTRYPOINT [ "/risingwave/bin/risingwave" ]
5968
CMD [ "playground" ]

java/.gitignore

Lines changed: 6 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -1,5 +1,10 @@
11
# Java
22
target/
33
test_db/
4+
bin/
5+
.settings/
46
*.log
5-
*.class
7+
*.class
8+
.project
9+
.factorypath
10+
.classpath

java/connector-node/README.md

Lines changed: 5 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -21,6 +21,11 @@ If you meet problem, you can try the following to skip the unit test:
2121
mvn clean package -DskipTests=true
2222
```
2323

24+
To disable building the rust library, you can try the following:
25+
```
26+
mvn clean package -Dno-build-rust
27+
```
28+
2429
This will create a `.tar.gz` file with the Connector Node and all its dependencies in the `risingwave/java/connector-node/assembly/target` directory. To run the Connector Node, execute the following command:
2530

2631
```

java/connector-node/assembly/assembly.xml

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -39,6 +39,7 @@
3939
<include>*:risingwave-source-cdc</include>
4040

4141
<!-- Sink connectors -->
42+
<include>*:risingwave-sink-es</include>
4243
<include>*:risingwave-sink-jdbc</include>
4344
<include>*:risingwave-sink-iceberg</include>
4445
<include>*:risingwave-sink-deltalake</include>

java/connector-node/assembly/pom.xml

Lines changed: 4 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -29,6 +29,10 @@
2929
<groupId>com.risingwave.java</groupId>
3030
<artifactId>risingwave-source-cdc</artifactId>
3131
</dependency>
32+
<dependency>
33+
<groupId>com.risingwave.java</groupId>
34+
<artifactId>risingwave-sink-es</artifactId>
35+
</dependency>
3236
<dependency>
3337
<groupId>com.risingwave.java</groupId>
3438
<artifactId>risingwave-sink-jdbc</artifactId>

java/connector-node/python-client/integration_tests.py

Lines changed: 8 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -156,6 +156,11 @@ def validate_jdbc_sink(input_file):
156156
rows[i][j]))
157157
exit(1)
158158

159+
def test_elasticsearch_sink(input_file):
160+
test_sink("elasticsearch",
161+
{"url": "http://127.0.0.1:9200",
162+
"index": "test"},
163+
input_file)
159164

160165
def test_iceberg_sink(input_file):
161166
test_sink("iceberg",
@@ -196,6 +201,7 @@ def test_deltalake_sink(input_file):
196201
parser.add_argument('--iceberg_sink', action='store_true', help="run iceberg sink test")
197202
parser.add_argument('--upsert_iceberg_sink', action='store_true', help="run upsert iceberg sink test")
198203
parser.add_argument('--deltalake_sink', action='store_true', help="run deltalake sink test")
204+
parser.add_argument('--es_sink', action='store_true', help='run elasticsearch sink test')
199205
parser.add_argument('--input_file', default="./data/sink_input.json", help="input data to run tests")
200206
args = parser.parse_args()
201207
if args.file_sink:
@@ -208,3 +214,5 @@ def test_deltalake_sink(input_file):
208214
test_upsert_iceberg_sink(args.input_file)
209215
if args.deltalake_sink:
210216
test_deltalake_sink(args.input_file)
217+
if args.es_sink:
218+
test_elasticsearch_sink(args.input_file)

java/connector-node/risingwave-connector-service/pom.xml

Lines changed: 5 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -94,5 +94,10 @@
9494
<artifactId>risingwave-sink-deltalake</artifactId>
9595
<scope>provided</scope>
9696
</dependency>
97+
<dependency>
98+
<groupId>com.risingwave.java</groupId>
99+
<artifactId>risingwave-sink-es</artifactId>
100+
<scope>provided</scope>
101+
</dependency>
97102
</dependencies>
98103
</project>

java/connector-node/risingwave-connector-service/src/main/java/com/risingwave/connector/SinkUtils.java

Lines changed: 2 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -29,6 +29,8 @@ public static SinkFactory getSinkFactory(String sinkType) {
2929
return new IcebergSinkFactory();
3030
case "deltalake":
3131
return new DeltaLakeSinkFactory();
32+
case "elasticsearch":
33+
return new EsSinkFactory();
3234
default:
3335
throw UNIMPLEMENTED
3436
.withDescription("unknown sink type: " + sinkType)

java/connector-node/risingwave-connector-test/pom.xml

Lines changed: 11 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -135,6 +135,12 @@
135135
<version>${testcontainers.version}</version>
136136
<scope>test</scope>
137137
</dependency>
138+
<dependency>
139+
<groupId>org.testcontainers</groupId>
140+
<artifactId>elasticsearch</artifactId>
141+
<version>${testcontainers.version}</version>
142+
<scope>test</scope>
143+
</dependency>
138144
<dependency>
139145
<groupId>com.fasterxml.jackson.core</groupId>
140146
<artifactId>jackson-databind</artifactId>
@@ -163,5 +169,10 @@
163169
<artifactId>risingwave-sink-jdbc</artifactId>
164170
<scope>test</scope>
165171
</dependency>
172+
<dependency>
173+
<groupId>com.risingwave.java</groupId>
174+
<artifactId>risingwave-sink-es</artifactId>
175+
<scope>test</scope>
176+
</dependency>
166177
</dependencies>
167178
</project>
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,104 @@
1+
// Copyright 2023 RisingWave Labs
2+
//
3+
// Licensed under the Apache License, Version 2.0 (the "License");
4+
// you may not use this file except in compliance with the License.
5+
// You may obtain a copy of the License at
6+
//
7+
// http://www.apache.org/licenses/LICENSE-2.0
8+
//
9+
// Unless required by applicable law or agreed to in writing, software
10+
// distributed under the License is distributed on an "AS IS" BASIS,
11+
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
12+
// See the License for the specific language governing permissions and
13+
// limitations under the License.
14+
15+
package com.risingwave.connector.sink.elasticsearch;
16+
17+
import static org.junit.Assert.assertEquals;
18+
import static org.junit.Assert.fail;
19+
20+
import com.google.common.collect.Iterators;
21+
import com.google.common.collect.Lists;
22+
import com.risingwave.connector.EsSink;
23+
import com.risingwave.connector.EsSinkConfig;
24+
import com.risingwave.connector.api.TableSchema;
25+
import com.risingwave.connector.api.sink.ArraySinkRow;
26+
import com.risingwave.proto.Data;
27+
import com.risingwave.proto.Data.DataType.TypeName;
28+
import com.risingwave.proto.Data.Op;
29+
import java.io.IOException;
30+
import java.util.Map;
31+
import org.elasticsearch.action.search.SearchRequest;
32+
import org.elasticsearch.action.search.SearchResponse;
33+
import org.elasticsearch.client.RequestOptions;
34+
import org.elasticsearch.client.RestHighLevelClient;
35+
import org.elasticsearch.index.query.QueryBuilders;
36+
import org.elasticsearch.search.SearchHit;
37+
import org.elasticsearch.search.SearchHits;
38+
import org.elasticsearch.search.builder.SearchSourceBuilder;
39+
import org.junit.Test;
40+
import org.testcontainers.elasticsearch.ElasticsearchContainer;
41+
42+
public class EsSinkTest {
43+
44+
static TableSchema getTestTableSchema() {
45+
return new TableSchema(
46+
Lists.newArrayList("id", "name"),
47+
Lists.newArrayList(
48+
Data.DataType.newBuilder().setTypeName(TypeName.INT32).build(),
49+
Data.DataType.newBuilder().setTypeName(TypeName.VARCHAR).build()),
50+
Lists.newArrayList("id", "name"));
51+
}
52+
53+
public void testEsSink(ElasticsearchContainer container) throws IOException {
54+
EsSink sink =
55+
new EsSink(
56+
new EsSinkConfig(container.getHttpHostAddress(), "test", "$"),
57+
getTestTableSchema());
58+
sink.write(
59+
Iterators.forArray(
60+
new ArraySinkRow(Op.INSERT, 1, "Alice"),
61+
new ArraySinkRow(Op.INSERT, 2, "Bob")));
62+
sink.sync();
63+
// container is slow here, but our default flush time is 5s,
64+
// so 2s is enough for sync test
65+
try {
66+
Thread.sleep(3000);
67+
} catch (InterruptedException e) {
68+
fail(e.getMessage());
69+
}
70+
71+
RestHighLevelClient client = sink.getClient();
72+
SearchRequest searchRequest = new SearchRequest("test");
73+
SearchSourceBuilder searchSourceBuilder = new SearchSourceBuilder();
74+
searchSourceBuilder.query(QueryBuilders.matchAllQuery());
75+
searchRequest.source(searchSourceBuilder);
76+
SearchResponse searchResponse = client.search(searchRequest, RequestOptions.DEFAULT);
77+
78+
SearchHits hits = searchResponse.getHits();
79+
assertEquals(2, hits.getHits().length);
80+
81+
SearchHit hit = hits.getAt(0);
82+
Map<String, Object> sourceAsMap = hit.getSourceAsMap();
83+
assertEquals(1, sourceAsMap.get("id"));
84+
assertEquals("Alice", sourceAsMap.get("name"));
85+
assertEquals("1$Alice", hit.getId());
86+
87+
hit = hits.getAt(1);
88+
sourceAsMap = hit.getSourceAsMap();
89+
assertEquals(2, sourceAsMap.get("id"));
90+
assertEquals("Bob", sourceAsMap.get("name"));
91+
assertEquals("2$Bob", hit.getId());
92+
93+
sink.drop();
94+
}
95+
96+
@Test
97+
public void testElasticSearch() throws IOException {
98+
ElasticsearchContainer container =
99+
new ElasticsearchContainer("docker.elastic.co/elasticsearch/elasticsearch:7.11.0");
100+
container.start();
101+
testEsSink(container);
102+
container.stop();
103+
}
104+
}
Lines changed: 64 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,64 @@
1+
<?xml version="1.0" encoding="UTF-8"?>
2+
<project xmlns="http://maven.apache.org/POM/4.0.0" xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"
3+
xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 http://maven.apache.org/xsd/maven-4.0.0.xsd">
4+
<parent>
5+
<artifactId>java-parent</artifactId>
6+
<groupId>com.risingwave.java</groupId>
7+
<version>1.0-SNAPSHOT</version>
8+
<relativePath>../../pom.xml</relativePath>
9+
</parent>
10+
<modelVersion>4.0.0</modelVersion>
11+
12+
13+
<artifactId>risingwave-sink-es</artifactId>
14+
<version>1.0-SNAPSHOT</version>
15+
<name>risingwave-sink-es</name>
16+
17+
<dependencies>
18+
<dependency>
19+
<groupId>com.risingwave.java</groupId>
20+
<artifactId>proto</artifactId>
21+
</dependency>
22+
<dependency>
23+
<groupId>com.risingwave.java</groupId>
24+
<artifactId>connector-api</artifactId>
25+
</dependency>
26+
<dependency>
27+
<groupId>org.apache.logging.log4j</groupId>
28+
<artifactId>log4j-api</artifactId>
29+
</dependency>
30+
<dependency>
31+
<groupId>org.apache.logging.log4j</groupId>
32+
<artifactId>log4j-slf4j-impl</artifactId>
33+
</dependency>
34+
<dependency>
35+
<groupId>org.apache.logging.log4j</groupId>
36+
<artifactId>log4j-core</artifactId>
37+
</dependency>
38+
<dependency>
39+
<groupId>org.apache.commons</groupId>
40+
<artifactId>commons-text</artifactId>
41+
</dependency>
42+
<dependency>
43+
<groupId>com.fasterxml.jackson.core</groupId>
44+
<artifactId>jackson-databind</artifactId>
45+
<version>${jackson.version}</version>
46+
</dependency>
47+
<dependency>
48+
<groupId>com.fasterxml.jackson.core</groupId>
49+
<artifactId>jackson-core</artifactId>
50+
<version>${jackson.version}</version>
51+
</dependency>
52+
53+
<!-- ElasticSearch drivers -->
54+
<dependency>
55+
<groupId>org.elasticsearch</groupId>
56+
<artifactId>elasticsearch</artifactId>
57+
</dependency>
58+
<dependency>
59+
<groupId>org.elasticsearch.client</groupId>
60+
<artifactId>elasticsearch-rest-high-level-client</artifactId>
61+
</dependency>
62+
</dependencies>
63+
64+
</project>

0 commit comments

Comments
 (0)