Skip to content

Commit 30b9477

Browse files
committed
e2e tests
1 parent cef10df commit 30b9477

File tree

4 files changed

+115
-6
lines changed

4 files changed

+115
-6
lines changed

ci/scripts/e2e-kafka-sink-test.sh

+9-1
Original file line numberDiff line numberDiff line change
@@ -143,14 +143,22 @@ sqllogictest -p 4566 -d dev 'e2e_test/sink/kafka/drop_sink.slt'
143143
./.risingwave/bin/kafka/bin/kafka-topics.sh --bootstrap-server message_queue:29092 --topic test-rw-sink-debezium --delete > /dev/null 2>&1
144144

145145
# test different encoding
146+
echo "preparing confluent schema registry"
147+
python3 -m pip install requests confluent-kafka
148+
146149
echo "testing protobuf"
147150
cp src/connector/src/test_data/proto_recursive/recursive.pb ./proto-recursive
148151
./.risingwave/bin/kafka/bin/kafka-topics.sh --bootstrap-server message_queue:29092 --topic test-rw-sink-append-only-protobuf --create > /dev/null 2>&1
152+
./.risingwave/bin/kafka/bin/kafka-topics.sh --bootstrap-server message_queue:29092 --topic test-rw-sink-append-only-protobuf-csr-a --create > /dev/null 2>&1
153+
./.risingwave/bin/kafka/bin/kafka-topics.sh --bootstrap-server message_queue:29092 --topic test-rw-sink-append-only-protobuf-csr-hi --create > /dev/null 2>&1
154+
python3 e2e_test/sink/kafka/register_schema.py 'http://message_queue:8081' 'test-rw-sink-append-only-protobuf-csr-a-value' src/connector/src/test_data/test-index-array.proto
155+
python3 e2e_test/sink/kafka/register_schema.py 'http://message_queue:8081' 'test-rw-sink-append-only-protobuf-csr-hi-value' src/connector/src/test_data/test-index-array.proto
149156
sqllogictest -p 4566 -d dev 'e2e_test/sink/kafka/protobuf.slt'
157+
./.risingwave/bin/kafka/bin/kafka-topics.sh --bootstrap-server message_queue:29092 --topic test-rw-sink-append-only-protobuf-csr-hi --delete > /dev/null 2>&1
158+
./.risingwave/bin/kafka/bin/kafka-topics.sh --bootstrap-server message_queue:29092 --topic test-rw-sink-append-only-protobuf-csr-a --delete > /dev/null 2>&1
150159
./.risingwave/bin/kafka/bin/kafka-topics.sh --bootstrap-server message_queue:29092 --topic test-rw-sink-append-only-protobuf --delete > /dev/null 2>&1
151160

152161
echo "testing avro"
153-
python3 -m pip install requests confluent-kafka
154162
python3 e2e_test/sink/kafka/register_schema.py 'http://message_queue:8081' 'test-rw-sink-upsert-avro-value' src/connector/src/test_data/all-types.avsc
155163
python3 e2e_test/sink/kafka/register_schema.py 'http://message_queue:8081' 'test-rw-sink-upsert-avro-key' src/connector/src/test_data/all-types.avsc 'string_field,int32_field'
156164
./.risingwave/bin/kafka/bin/kafka-topics.sh --bootstrap-server message_queue:29092 --topic test-rw-sink-upsert-avro --create > /dev/null 2>&1

e2e_test/sink/kafka/protobuf.slt

+56
Original file line numberDiff line numberDiff line change
@@ -7,6 +7,24 @@ format plain encode protobuf (
77
schema.location = 'file:///risingwave/proto-recursive',
88
message = 'recursive.AllTypes');
99

10+
statement ok
11+
create table from_kafka_csr_trivial with (
12+
connector = 'kafka',
13+
topic = 'test-rw-sink-append-only-protobuf-csr-a',
14+
properties.bootstrap.server = 'message_queue:29092')
15+
format plain encode protobuf (
16+
schema.registry = 'http://message_queue:8081',
17+
message = 'test.package.MessageA');
18+
19+
statement ok
20+
create table from_kafka_csr_nested with (
21+
connector = 'kafka',
22+
topic = 'test-rw-sink-append-only-protobuf-csr-hi',
23+
properties.bootstrap.server = 'message_queue:29092')
24+
format plain encode protobuf (
25+
schema.registry = 'http://message_queue:8081',
26+
message = 'test.package.MessageH.MessageI');
27+
1028
statement ok
1129
create table into_kafka (
1230
bool_field bool,
@@ -43,6 +61,26 @@ format plain encode protobuf (
4361
schema.location = 'file:///risingwave/proto-recursive',
4462
message = 'recursive.AllTypes');
4563

64+
statement ok
65+
create sink sink_csr_trivial as select string_field as field_a from into_kafka with (
66+
connector = 'kafka',
67+
topic = 'test-rw-sink-append-only-protobuf-csr-a',
68+
properties.bootstrap.server = 'message_queue:29092')
69+
format plain encode protobuf (
70+
force_append_only = true,
71+
schema.registry = 'http://message_queue:8081',
72+
message = 'test.package.MessageA');
73+
74+
statement ok
75+
create sink sink_csr_nested as select sint32_field as field_i from into_kafka with (
76+
connector = 'kafka',
77+
topic = 'test-rw-sink-append-only-protobuf-csr-hi',
78+
properties.bootstrap.server = 'message_queue:29092')
79+
format plain encode protobuf (
80+
force_append_only = true,
81+
schema.registry = 'http://message_queue:8081',
82+
message = 'test.package.MessageH.MessageI');
83+
4684
sleep 2s
4785

4886
query TTTRRIIIIIITTTI
@@ -66,6 +104,18 @@ select
66104
t Rising \x6130 3.5 4.25 22 23 24 0 26 27 (1,"") {4,0,4} (1136239445,0) 42
67105
f Wave \x5a4446 1.5 0 11 12 13 14 15 16 (4,foo) {} (0,0) 0
68106

107+
query T
108+
select field_a from from_kafka_csr_trivial order by 1;
109+
----
110+
Rising
111+
Wave
112+
113+
query I
114+
select field_i from from_kafka_csr_nested order by 1;
115+
----
116+
13
117+
24
118+
69119
statement error No such file
70120
create sink sink_err from into_kafka with (
71121
connector = 'kafka',
@@ -96,6 +146,12 @@ format plain encode protobuf (
96146
schema.location = 's3:///risingwave/proto-recursive',
97147
message = 'recursive.AllTypes');
98148

149+
statement ok
150+
drop sink sink_csr_nested;
151+
152+
statement ok
153+
drop sink sink_csr_trivial;
154+
99155
statement ok
100156
drop sink sink0;
101157

e2e_test/sink/kafka/register_schema.py

+11-5
Original file line numberDiff line numberDiff line change
@@ -5,7 +5,8 @@
55
def main():
66
url = sys.argv[1]
77
subject = sys.argv[2]
8-
with open(sys.argv[3]) as f:
8+
local_path = sys.argv[3]
9+
with open(local_path) as f:
910
schema_str = f.read()
1011
if 4 < len(sys.argv):
1112
keys = sys.argv[4].split(',')
@@ -14,11 +15,16 @@ def main():
1415

1516
client = SchemaRegistryClient({"url": url})
1617

17-
if keys:
18-
schema_str = select_keys(schema_str, keys)
18+
if local_path.endswith('.avsc'):
19+
if keys:
20+
schema_str = select_keys(schema_str, keys)
21+
else:
22+
schema_str = remove_unsupported(schema_str)
23+
schema = Schema(schema_str, 'AVRO')
24+
elif local_path.endswith('.proto'):
25+
schema = Schema(schema_str, 'PROTOBUF')
1926
else:
20-
schema_str = remove_unsupported(schema_str)
21-
schema = Schema(schema_str, 'AVRO')
27+
raise ValueError('{} shall end with .avsc or .proto'.format(local_path))
2228
client.register_schema(subject, schema)
2329

2430

Original file line numberDiff line numberDiff line change
@@ -0,0 +1,39 @@
1+
// Example taken from https://docs.confluent.io/platform/7.6/schema-registry/fundamentals/serdes-develop/index.html#wire-format
2+
// `test.package.MessageH.MessageI` `[1, 0]` `2, 1, 0` 0x040200
3+
// `test.package.MessageA.MessageE.MessageG` `[0, 2, 1]` `3, 0, 2, 1` 0x06000402
4+
// `test.package.MessageA` `[0]` `1, 0`/`0` 0x00
5+
6+
syntax = "proto3";
7+
package test.package;
8+
9+
message MessageA {
10+
string field_a = 1;
11+
12+
message MessageB {
13+
double field_b = 1;
14+
15+
message MessageC {
16+
sint32 field_c = 1;
17+
}
18+
}
19+
message MessageD {
20+
sint32 field_d = 1;
21+
}
22+
message MessageE {
23+
string field_e = 1;
24+
25+
message MessageF {
26+
double field_f = 1;
27+
}
28+
message MessageG {
29+
sint32 field_g = 1;
30+
}
31+
}
32+
}
33+
message MessageH {
34+
double field_h = 1;
35+
36+
message MessageI {
37+
sint32 field_i = 1;
38+
}
39+
}

0 commit comments

Comments
 (0)