Skip to content

Commit 8be4734

Browse files
authored
feat(java-binding): support java binding on stream chunk (risingwavelabs#8517)
1 parent 8be23f4 commit 8be4734

20 files changed

+635
-188
lines changed

Cargo.lock

Lines changed: 1 addition & 0 deletions
Some generated files are not rendered by default. Learn more about customizing how changed files appear on GitHub.

ci/scripts/java-binding-test.sh

Lines changed: 3 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -53,3 +53,6 @@ cargo make ingest-data-and-run-java-binding
5353

5454
echo "--- Kill cluster"
5555
cargo make ci-kill
56+
57+
echo "--- run stream chunk java binding"
58+
cargo make run-java-binding-stream-chunk-demo

java/com_risingwave_java_binding_Binding.h

Lines changed: 38 additions & 6 deletions
Some generated files are not rendered by default. Learn more about customizing how changed files appear on GitHub.

java/java-binding-integration-test/src/main/java/com/risingwave/java/binding/Demo.java renamed to java/java-binding-integration-test/src/main/java/com/risingwave/java/binding/HummockReadDemo.java

Lines changed: 4 additions & 41 deletions
Original file line numberDiff line numberDiff line change
@@ -14,6 +14,8 @@
1414

1515
package com.risingwave.java.binding;
1616

17+
import static com.risingwave.java.binding.Utils.validateRow;
18+
1719
import com.risingwave.java.utils.MetaClient;
1820
import com.risingwave.proto.Catalog.Table;
1921
import com.risingwave.proto.Hummock.HummockVersion;
@@ -27,7 +29,7 @@
2729
import java.util.concurrent.ScheduledThreadPoolExecutor;
2830

2931
/** Hello world! */
30-
public class Demo {
32+
public class HummockReadDemo {
3133
public static void main(String[] args) {
3234
String objectStore = System.getenv("OBJECT_STORE");
3335
String dbName = System.getenv("DB_NAME");
@@ -67,7 +69,7 @@ public static void main(String[] args) {
6769
.addAllVnodeIds(vnodeList)
6870
.build();
6971

70-
try (Iterator iter = new Iterator(readPlan)) {
72+
try (HummockIterator iter = new HummockIterator(readPlan)) {
7173
int count = 0;
7274
while (true) {
7375
try (KeyedRow row = iter.next()) {
@@ -92,43 +94,4 @@ public static void main(String[] args) {
9294

9395
scheduledThreadPool.shutdown();
9496
}
95-
96-
static void validateRow(KeyedRow row) {
97-
// The validation of row data are according to the data generation rule
98-
// defined in ${REPO_ROOT}/src/java_binding/gen-demo-insert-data.py
99-
short rowIndex = row.getShort(0);
100-
if (row.getInt(1) != rowIndex) {
101-
throw new RuntimeException(
102-
String.format("invalid int value: %s %s", row.getInt(1), rowIndex));
103-
}
104-
if (row.getLong(2) != rowIndex) {
105-
throw new RuntimeException(
106-
String.format("invalid long value: %s %s", row.getLong(2), rowIndex));
107-
}
108-
if (row.getFloat(3) != (float) rowIndex) {
109-
throw new RuntimeException(
110-
String.format("invalid float value: %s %s", row.getFloat(3), rowIndex));
111-
}
112-
if (row.getDouble(4) != (double) rowIndex) {
113-
throw new RuntimeException(
114-
String.format("invalid double value: %s %s", row.getDouble(4), rowIndex));
115-
}
116-
if (row.getBoolean(5) != (rowIndex % 3 == 0)) {
117-
throw new RuntimeException(
118-
String.format(
119-
"invalid bool value: %s %s", row.getBoolean(5), (rowIndex % 3 == 0)));
120-
}
121-
if (!row.getString(6).equals(((Short) rowIndex).toString().repeat((rowIndex % 10) + 1))) {
122-
throw new RuntimeException(
123-
String.format(
124-
"invalid string value: %s %s",
125-
row.getString(6),
126-
((Short) rowIndex).toString().repeat((rowIndex % 10) + 1)));
127-
}
128-
if (row.isNull(7) != (rowIndex % 5 == 0)) {
129-
throw new RuntimeException(
130-
String.format(
131-
"invalid isNull value: %s %s", row.isNull(7), (rowIndex % 5 == 0)));
132-
}
133-
}
13497
}
Lines changed: 43 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,43 @@
1+
// Copyright 2023 RisingWave Labs
2+
//
3+
// Licensed under the Apache License, Version 2.0 (the "License");
4+
// you may not use this file except in compliance with the License.
5+
// You may obtain a copy of the License at
6+
//
7+
// http://www.apache.org/licenses/LICENSE-2.0
8+
//
9+
// Unless required by applicable law or agreed to in writing, software
10+
// distributed under the License is distributed on an "AS IS" BASIS,
11+
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
12+
// See the License for the specific language governing permissions and
13+
// limitations under the License.
14+
15+
package com.risingwave.java.binding;
16+
17+
import static com.risingwave.java.binding.Utils.validateRow;
18+
19+
import java.io.IOException;
20+
21+
public class StreamChunkDemo {
22+
23+
public static void main(String[] args) throws IOException {
24+
byte[] payload = System.in.readAllBytes();
25+
try (StreamChunkIterator iter = new StreamChunkIterator(payload)) {
26+
int count = 0;
27+
while (true) {
28+
try (StreamChunkRow row = iter.next()) {
29+
if (row == null) {
30+
break;
31+
}
32+
count += 1;
33+
validateRow(row);
34+
}
35+
}
36+
int expectedCount = 30000;
37+
if (count != expectedCount) {
38+
throw new RuntimeException(
39+
String.format("row count is %s, should be %s", count, expectedCount));
40+
}
41+
}
42+
}
43+
}
Lines changed: 56 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,56 @@
1+
// Copyright 2023 RisingWave Labs
2+
//
3+
// Licensed under the Apache License, Version 2.0 (the "License");
4+
// you may not use this file except in compliance with the License.
5+
// You may obtain a copy of the License at
6+
//
7+
// http://www.apache.org/licenses/LICENSE-2.0
8+
//
9+
// Unless required by applicable law or agreed to in writing, software
10+
// distributed under the License is distributed on an "AS IS" BASIS,
11+
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
12+
// See the License for the specific language governing permissions and
13+
// limitations under the License.
14+
15+
package com.risingwave.java.binding;
16+
17+
public class Utils {
18+
public static void validateRow(BaseRow row) {
19+
// The validation of row data are according to the data generation rule
20+
// defined in ${REPO_ROOT}/src/java_binding/gen-demo-insert-data.py
21+
short rowIndex = row.getShort(0);
22+
if (row.getInt(1) != rowIndex) {
23+
throw new RuntimeException(
24+
String.format("invalid int value: %s %s", row.getInt(1), rowIndex));
25+
}
26+
if (row.getLong(2) != rowIndex) {
27+
throw new RuntimeException(
28+
String.format("invalid long value: %s %s", row.getLong(2), rowIndex));
29+
}
30+
if (row.getFloat(3) != (float) rowIndex) {
31+
throw new RuntimeException(
32+
String.format("invalid float value: %s %s", row.getFloat(3), rowIndex));
33+
}
34+
if (row.getDouble(4) != (double) rowIndex) {
35+
throw new RuntimeException(
36+
String.format("invalid double value: %s %s", row.getDouble(4), rowIndex));
37+
}
38+
if (row.getBoolean(5) != (rowIndex % 3 == 0)) {
39+
throw new RuntimeException(
40+
String.format(
41+
"invalid bool value: %s %s", row.getBoolean(5), (rowIndex % 3 == 0)));
42+
}
43+
if (!row.getString(6).equals(((Short) rowIndex).toString().repeat((rowIndex % 10) + 1))) {
44+
throw new RuntimeException(
45+
String.format(
46+
"invalid string value: %s %s",
47+
row.getString(6),
48+
((Short) rowIndex).toString().repeat((rowIndex % 10) + 1)));
49+
}
50+
if (row.isNull(7) != (rowIndex % 5 == 0)) {
51+
throw new RuntimeException(
52+
String.format(
53+
"invalid isNull value: %s %s", row.isNull(7), (rowIndex % 5 == 0)));
54+
}
55+
}
56+
}
Lines changed: 65 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,65 @@
1+
// Copyright 2023 RisingWave Labs
2+
//
3+
// Licensed under the Apache License, Version 2.0 (the "License");
4+
// you may not use this file except in compliance with the License.
5+
// You may obtain a copy of the License at
6+
//
7+
// http://www.apache.org/licenses/LICENSE-2.0
8+
//
9+
// Unless required by applicable law or agreed to in writing, software
10+
// distributed under the License is distributed on an "AS IS" BASIS,
11+
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
12+
// See the License for the specific language governing permissions and
13+
// limitations under the License.
14+
15+
package com.risingwave.java.binding;
16+
17+
public class BaseRow implements AutoCloseable {
18+
protected final long pointer;
19+
private boolean isClosed;
20+
21+
protected BaseRow(long pointer) {
22+
this.pointer = pointer;
23+
this.isClosed = false;
24+
}
25+
26+
public boolean isNull(int index) {
27+
return Binding.rowIsNull(pointer, index);
28+
}
29+
30+
public short getShort(int index) {
31+
return Binding.rowGetInt16Value(pointer, index);
32+
}
33+
34+
public int getInt(int index) {
35+
return Binding.rowGetInt32Value(pointer, index);
36+
}
37+
38+
public long getLong(int index) {
39+
return Binding.rowGetInt64Value(pointer, index);
40+
}
41+
42+
public float getFloat(int index) {
43+
return Binding.rowGetFloatValue(pointer, index);
44+
}
45+
46+
public double getDouble(int index) {
47+
return Binding.rowGetDoubleValue(pointer, index);
48+
}
49+
50+
public boolean getBoolean(int index) {
51+
return Binding.rowGetBooleanValue(pointer, index);
52+
}
53+
54+
public String getString(int index) {
55+
return Binding.rowGetStringValue(pointer, index);
56+
}
57+
58+
@Override
59+
public void close() {
60+
if (!isClosed) {
61+
isClosed = true;
62+
Binding.rowClose(pointer);
63+
}
64+
}
65+
}

java/java-binding/src/main/java/com/risingwave/java/binding/Binding.java

Lines changed: 13 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -21,20 +21,22 @@ public class Binding {
2121

2222
public static native int vnodeCount();
2323

24-
// iterator method
24+
// hummock iterator method
2525
// Return a pointer to the iterator
26-
static native long iteratorNew(byte[] readPlan);
26+
static native long hummockIteratorNew(byte[] readPlan);
2727

2828
// return a pointer to the next row
29-
static native long iteratorNext(long pointer);
29+
static native long hummockIteratorNext(long pointer);
3030

3131
// Since the underlying rust does not have garbage collection, we will have to manually call
3232
// close on the iterator to release the iterator instance pointed by the pointer.
33-
static native void iteratorClose(long pointer);
33+
static native void hummockIteratorClose(long pointer);
3434

3535
// row method
3636
static native byte[] rowGetKey(long pointer);
3737

38+
static native int rowGetOp(long pointer);
39+
3840
static native boolean rowIsNull(long pointer, int index);
3941

4042
static native short rowGetInt16Value(long pointer, int index);
@@ -54,4 +56,11 @@ public class Binding {
5456
// Since the underlying rust does not have garbage collection, we will have to manually call
5557
// close on the row to release the row instance pointed by the pointer.
5658
static native void rowClose(long pointer);
59+
60+
// stream chunk iterator method
61+
static native long streamChunkIteratorNew(byte[] streamChunkPayload);
62+
63+
static native long streamChunkIteratorNext(long pointer);
64+
65+
static native void streamChunkIteratorClose(long pointer);
5766
}

0 commit comments

Comments
 (0)