Skip to content

Commit 78ddbce

Browse files
authored
feat(connector-node): specify sink payload format in start sink and call close for iterator and sink row (risingwavelabs#8585)
1 parent f4a2f8d commit 78ddbce

File tree

29 files changed

+441
-297
lines changed

29 files changed

+441
-297
lines changed

dashboard/proto/gen/connector_service.ts

Lines changed: 43 additions & 2 deletions
Some generated files are not rendered by default. Learn more about customizing how changed files appear on GitHub.

java/connector-node/assembly/scripts/start-service.sh

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -15,7 +15,7 @@ while getopts ":h:p:" o; do
1515
done
1616
shift $((OPTIND-1))
1717

18-
DIR="$( cd "$( dirname "${BASH_SOURCE[0]}" )" && pwd )"
18+
DIR="$( cd "$( dirname "$0" )" && pwd )"
1919
MAIN='com.risingwave.connector.ConnectorService'
2020
PORT=50051
2121

java/connector-node/connector-api/src/main/java/com/risingwave/connector/api/sink/ArraySinkrow.java renamed to java/connector-node/connector-api/src/main/java/com/risingwave/connector/api/sink/ArraySinkRow.java

Lines changed: 5 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -16,11 +16,11 @@
1616

1717
import com.risingwave.proto.Data;
1818

19-
public class ArraySinkrow implements SinkRow {
19+
public class ArraySinkRow implements SinkRow {
2020
public final Object[] values;
2121
public final Data.Op op;
2222

23-
public ArraySinkrow(Data.Op op, Object... value) {
23+
public ArraySinkRow(Data.Op op, Object... value) {
2424
this.op = op;
2525
this.values = value;
2626
}
@@ -39,4 +39,7 @@ public Data.Op getOp() {
3939
public int size() {
4040
return values.length;
4141
}
42+
43+
@Override
44+
public void close() throws Exception {}
4245
}
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,21 @@
1+
/*
2+
* Copyright 2023 RisingWave Labs
3+
*
4+
* Licensed under the Apache License, Version 2.0 (the "License");
5+
* you may not use this file except in compliance with the License.
6+
* You may obtain a copy of the License at
7+
*
8+
* http://www.apache.org/licenses/LICENSE-2.0
9+
*
10+
* Unless required by applicable law or agreed to in writing, software
11+
* distributed under the License is distributed on an "AS IS" BASIS,
12+
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
13+
* See the License for the specific language governing permissions and
14+
* limitations under the License.
15+
*/
16+
17+
package com.risingwave.connector.api.sink;
18+
19+
import java.util.Iterator;
20+
21+
public interface CloseableIterator<E> extends AutoCloseable, Iterator<E> {}
Lines changed: 4 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -12,11 +12,11 @@
1212
// See the License for the specific language governing permissions and
1313
// limitations under the License.
1414

15-
package com.risingwave.connector;
15+
package com.risingwave.connector.api.sink;
1616

17-
import com.risingwave.connector.api.sink.SinkRow;
18-
import java.util.Iterator;
17+
import com.risingwave.proto.ConnectorServiceProto;
1918

2019
public interface Deserializer {
21-
Iterator<SinkRow> deserialize(Object payload);
20+
CloseableIterator<SinkRow> deserialize(
21+
ConnectorServiceProto.SinkStreamRequest.WriteBatch writeBatch);
2222
}

java/connector-node/connector-api/src/main/java/com/risingwave/connector/api/sink/SinkRow.java

Lines changed: 4 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -16,10 +16,10 @@
1616

1717
import com.risingwave.proto.Data;
1818

19-
public interface SinkRow {
20-
public Object get(int index);
19+
public interface SinkRow extends AutoCloseable {
20+
Object get(int index);
2121

22-
public Data.Op getOp();
22+
Data.Op getOp();
2323

24-
public int size();
24+
int size();
2525
}
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,41 @@
1+
/*
2+
* Copyright 2023 RisingWave Labs
3+
*
4+
* Licensed under the Apache License, Version 2.0 (the "License");
5+
* you may not use this file except in compliance with the License.
6+
* You may obtain a copy of the License at
7+
*
8+
* http://www.apache.org/licenses/LICENSE-2.0
9+
*
10+
* Unless required by applicable law or agreed to in writing, software
11+
* distributed under the License is distributed on an "AS IS" BASIS,
12+
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
13+
* See the License for the specific language governing permissions and
14+
* limitations under the License.
15+
*/
16+
17+
package com.risingwave.connector.api.sink;
18+
19+
import java.util.Iterator;
20+
21+
public class TrivialCloseIterator<E> implements CloseableIterator<E> {
22+
23+
private final Iterator<E> inner;
24+
25+
public TrivialCloseIterator(Iterator<E> inner) {
26+
this.inner = inner;
27+
}
28+
29+
@Override
30+
public void close() throws Exception {}
31+
32+
@Override
33+
public boolean hasNext() {
34+
return inner.hasNext();
35+
}
36+
37+
@Override
38+
public E next() {
39+
return inner.next();
40+
}
41+
}

java/connector-node/python-client/integration_tests.py

Lines changed: 2 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -46,6 +46,7 @@ def test_upsert_sink(type, prop, input_file):
4646
stub = connector_service_pb2_grpc.ConnectorServiceStub(channel)
4747
request_list = [
4848
connector_service_pb2.SinkStreamRequest(start=connector_service_pb2.SinkStreamRequest.StartSink(
49+
format=connector_service_pb2.SinkPayloadFormat.JSON,
4950
sink_config=connector_service_pb2.SinkConfig(
5051
connector_type=type,
5152
properties=prop,
@@ -85,6 +86,7 @@ def test_sink(type, prop, input_file):
8586
stub = connector_service_pb2_grpc.ConnectorServiceStub(channel)
8687
request_list = [
8788
connector_service_pb2.SinkStreamRequest(start=connector_service_pb2.SinkStreamRequest.StartSink(
89+
format=connector_service_pb2.SinkPayloadFormat.JSON,
8890
sink_config=connector_service_pb2.SinkConfig(
8991
connector_type=type,
9092
properties=prop,

java/connector-node/risingwave-connector-service/src/main/java/com/risingwave/connector/FileSink.java

Lines changed: 22 additions & 19 deletions
Original file line numberDiff line numberDiff line change
@@ -57,25 +57,28 @@ public FileSink(String sinkPath, TableSchema tableSchema) {
5757
@Override
5858
public void write(Iterator<SinkRow> rows) {
5959
while (rows.hasNext()) {
60-
SinkRow row = rows.next();
61-
switch (row.getOp()) {
62-
case INSERT:
63-
String buf =
64-
new Gson()
65-
.toJson(
66-
IntStream.range(0, row.size())
67-
.mapToObj(row::get)
68-
.toArray());
69-
try {
70-
sinkWriter.write(buf + System.lineSeparator());
71-
} catch (IOException e) {
72-
throw INTERNAL.withCause(e).asRuntimeException();
73-
}
74-
break;
75-
default:
76-
throw UNIMPLEMENTED
77-
.withDescription("unsupported operation: " + row.getOp())
78-
.asRuntimeException();
60+
try (SinkRow row = rows.next()) {
61+
switch (row.getOp()) {
62+
case INSERT:
63+
String buf =
64+
new Gson()
65+
.toJson(
66+
IntStream.range(0, row.size())
67+
.mapToObj(row::get)
68+
.toArray());
69+
try {
70+
sinkWriter.write(buf + System.lineSeparator());
71+
} catch (IOException e) {
72+
throw INTERNAL.withCause(e).asRuntimeException();
73+
}
74+
break;
75+
default:
76+
throw UNIMPLEMENTED
77+
.withDescription("unsupported operation: " + row.getOp())
78+
.asRuntimeException();
79+
}
80+
} catch (Exception e) {
81+
throw new RuntimeException(e);
7982
}
8083
}
8184
}

java/connector-node/risingwave-connector-service/src/main/java/com/risingwave/connector/JsonDeserializer.java

Lines changed: 32 additions & 28 deletions
Original file line numberDiff line numberDiff line change
@@ -18,11 +18,10 @@
1818

1919
import com.google.gson.Gson;
2020
import com.risingwave.connector.api.TableSchema;
21-
import com.risingwave.connector.api.sink.ArraySinkrow;
22-
import com.risingwave.connector.api.sink.SinkRow;
21+
import com.risingwave.connector.api.sink.*;
22+
import com.risingwave.proto.ConnectorServiceProto;
2323
import com.risingwave.proto.ConnectorServiceProto.SinkStreamRequest.WriteBatch.JsonPayload;
2424
import com.risingwave.proto.Data;
25-
import java.util.Iterator;
2625
import java.util.Map;
2726

2827
public class JsonDeserializer implements Deserializer {
@@ -33,34 +32,39 @@ public JsonDeserializer(TableSchema tableSchema) {
3332
}
3433

3534
@Override
36-
public Iterator<SinkRow> deserialize(Object payload) {
37-
if (!(payload instanceof JsonPayload)) {
35+
public CloseableIterator<SinkRow> deserialize(
36+
ConnectorServiceProto.SinkStreamRequest.WriteBatch writeBatch) {
37+
if (!writeBatch.hasJsonPayload()) {
3838
throw INVALID_ARGUMENT
39-
.withDescription("expected JsonPayload, got " + payload.getClass().getName())
39+
.withDescription("expected JsonPayload, got " + writeBatch.getPayloadCase())
4040
.asRuntimeException();
4141
}
42-
JsonPayload jsonPayload = (JsonPayload) payload;
43-
return jsonPayload.getRowOpsList().stream()
44-
.map(
45-
rowOp -> {
46-
Map columnValues = new Gson().fromJson(rowOp.getLine(), Map.class);
47-
Object[] values = new Object[columnValues.size()];
48-
for (String columnName : tableSchema.getColumnNames()) {
49-
if (!columnValues.containsKey(columnName)) {
50-
throw INVALID_ARGUMENT
51-
.withDescription(
52-
"column " + columnName + " not found in json")
53-
.asRuntimeException();
54-
}
55-
Data.DataType.TypeName typeName =
56-
tableSchema.getColumnType(columnName);
57-
values[tableSchema.getColumnIndex(columnName)] =
58-
validateJsonDataTypes(
59-
typeName, columnValues.get(columnName));
60-
}
61-
return (SinkRow) new ArraySinkrow(rowOp.getOpType(), values);
62-
})
63-
.iterator();
42+
JsonPayload jsonPayload = writeBatch.getJsonPayload();
43+
return new TrivialCloseIterator<>(
44+
jsonPayload.getRowOpsList().stream()
45+
.map(
46+
rowOp -> {
47+
Map columnValues =
48+
new Gson().fromJson(rowOp.getLine(), Map.class);
49+
Object[] values = new Object[columnValues.size()];
50+
for (String columnName : tableSchema.getColumnNames()) {
51+
if (!columnValues.containsKey(columnName)) {
52+
throw INVALID_ARGUMENT
53+
.withDescription(
54+
"column "
55+
+ columnName
56+
+ " not found in json")
57+
.asRuntimeException();
58+
}
59+
Data.DataType.TypeName typeName =
60+
tableSchema.getColumnType(columnName);
61+
values[tableSchema.getColumnIndex(columnName)] =
62+
validateJsonDataTypes(
63+
typeName, columnValues.get(columnName));
64+
}
65+
return (SinkRow) new ArraySinkRow(rowOp.getOpType(), values);
66+
})
67+
.iterator());
6468
}
6569

6670
private static Long castLong(Object value) {

java/connector-node/risingwave-connector-service/src/main/java/com/risingwave/connector/PrintSink.java

Lines changed: 12 additions & 7 deletions
Original file line numberDiff line numberDiff line change
@@ -40,13 +40,18 @@ public PrintSink(Map<String, String> properties, TableSchema tableSchema, PrintS
4040
@Override
4141
public void write(Iterator<SinkRow> rows) {
4242
while (rows.hasNext()) {
43-
SinkRow row = rows.next();
44-
out.println(
45-
"PrintSink: "
46-
+ row.getOp().name()
47-
+ " values "
48-
+ Arrays.toString(
49-
IntStream.range(0, row.size()).mapToObj(row::get).toArray()));
43+
try (SinkRow row = rows.next()) {
44+
out.println(
45+
"PrintSink: "
46+
+ row.getOp().name()
47+
+ " values "
48+
+ Arrays.toString(
49+
IntStream.range(0, row.size())
50+
.mapToObj(row::get)
51+
.toArray()));
52+
} catch (Exception e) {
53+
throw new RuntimeException(e);
54+
}
5055
}
5156
}
5257

0 commit comments

Comments
 (0)