Skip to content

Commit ef3435e

Browse files
authored
refactor(frontend): disallow creating new avro sources without schema registry (#15256)
1 parent fbafd75 commit ef3435e

File tree

3 files changed

+40
-22
lines changed

3 files changed

+40
-22
lines changed

e2e_test/source/basic/kafka.slt

+11-3
Original file line numberDiff line numberDiff line change
@@ -163,21 +163,29 @@ create table s8_no_schema_field (
163163
properties.bootstrap.server = 'message_queue:29092'
164164
) FORMAT DEBEZIUM ENCODE JSON
165165

166-
statement ok
166+
statement error without schema registry
167167
create table s9 with (
168168
connector = 'kafka',
169169
topic = 'avro_bin',
170170
properties.bootstrap.server = 'message_queue:29092',
171171
scan.startup.mode = 'earliest'
172172
) FORMAT PLAIN ENCODE AVRO (schema.location = 'file:///risingwave/avro-simple-schema.avsc');
173173

174+
statement ok
175+
create table s9 with (
176+
connector = 'kafka',
177+
topic = 'avro_bin',
178+
properties.bootstrap.server = 'message_queue:29092',
179+
scan.startup.mode = 'earliest'
180+
) FORMAT PLAIN ENCODE AVRO (schema.location = 'file:///risingwave/avro-simple-schema.avsc', with_deprecated_file_header = true);
181+
174182
statement ok
175183
create table s10 with (
176184
connector = 'kafka',
177185
topic = 'avro_c_bin',
178186
properties.bootstrap.server = 'message_queue:29092',
179187
scan.startup.mode = 'earliest'
180-
) FORMAT PLAIN ENCODE AVRO (schema.location = 'file:///risingwave/avro-complex-schema.avsc');
188+
) FORMAT PLAIN ENCODE AVRO (schema.location = 'file:///risingwave/avro-complex-schema.avsc', with_deprecated_file_header = true);
181189

182190
statement ok
183191
create table s11 with (
@@ -282,7 +290,7 @@ create source s18 with (
282290
topic = 'avro_c_bin',
283291
properties.bootstrap.server = 'message_queue:29092',
284292
scan.startup.mode = 'earliest'
285-
) FORMAT PLAIN ENCODE AVRO (schema.location = 'file:///risingwave/avro-complex-schema.avsc');
293+
) FORMAT PLAIN ENCODE AVRO (schema.location = 'file:///risingwave/avro-complex-schema.avsc', with_deprecated_file_header = true);
286294

287295
# we cannot use confluent schema registry when connector is not kafka
288296
statement error

e2e_test/source/basic/old_row_format_syntax/kafka.slt

+19-19
Original file line numberDiff line numberDiff line change
@@ -155,15 +155,15 @@ create table s8_no_schema_field (
155155
properties.bootstrap.server = 'message_queue:29092'
156156
) ROW FORMAT DEBEZIUM_JSON
157157

158-
statement ok
158+
statement error without schema registry
159159
create table s9 with (
160160
connector = 'kafka',
161161
topic = 'avro_bin',
162162
properties.bootstrap.server = 'message_queue:29092',
163163
scan.startup.mode = 'earliest'
164164
) row format avro row schema location 'file:///risingwave/avro-simple-schema.avsc'
165165

166-
statement ok
166+
statement error without schema registry
167167
create table s10 with (
168168
connector = 'kafka',
169169
topic = 'avro_c_bin',
@@ -262,7 +262,7 @@ create source s17 with (
262262
scan.startup.mode = 'earliest'
263263
) row format protobuf message 'test.User' row schema location 'file:///risingwave/proto-complex-schema'
264264

265-
statement ok
265+
statement error without schema registry
266266
create source s18 with (
267267
connector = 'kafka',
268268
topic = 'avro_c_bin',
@@ -560,15 +560,15 @@ select id, first_name, last_name, email from s8_no_schema_field;
560560
1004 Anne1 Kretchmar [email protected]
561561
1005 add add2 add
562562

563-
query IITFFBTT
564-
select id, sequence_id, name, score, avg_score, is_lasted, entrance_date, birthday, passed from s9;
565-
----
566-
32 64 str_value 32 64 t 1970-01-01 1970-01-01 00:00:00+00:00 1 mon 1 day 00:00:01
567-
568-
query ITITT
569-
select id, code, timestamp, xfas, contacts, sex from s10;
570-
----
571-
100 abc 1473305798 {"(0,200,10.0.0.1)","(1,400,10.0.0.2)"} ("{1xxx,2xxx}","{1xxx,2xxx}") MALE
563+
# query IITFFBTT
564+
# select id, sequence_id, name, score, avg_score, is_lasted, entrance_date, birthday, passed from s9;
565+
# ----
566+
# 32 64 str_value 32 64 t 1970-01-01 1970-01-01 00:00:00+00:00 1 mon 1 day 00:00:01
567+
#
568+
# query ITITT
569+
# select id, code, timestamp, xfas, contacts, sex from s10;
570+
# ----
571+
# 100 abc 1473305798 {"(0,200,10.0.0.1)","(1,400,10.0.0.2)"} ("{1xxx,2xxx}","{1xxx,2xxx}") MALE
572572

573573
query ITITT
574574
select id, code, timestamp, xfas, contacts, sex from s11;
@@ -706,11 +706,11 @@ drop table s8
706706
statement ok
707707
drop table s8_no_schema_field
708708

709-
statement ok
710-
drop table s9
711-
712-
statement ok
713-
drop table s10
709+
# statement ok
710+
# drop table s9
711+
#
712+
# statement ok
713+
# drop table s10
714714

715715
statement ok
716716
drop table s11
@@ -733,8 +733,8 @@ drop table s16
733733
statement ok
734734
drop source s17
735735

736-
statement ok
737-
drop source s18
736+
# statement ok
737+
# drop source s18
738738

739739
statement ok
740740
drop table s20

src/frontend/src/handler/create_source.rs

+10
Original file line numberDiff line numberDiff line change
@@ -21,6 +21,7 @@ use either::Either;
2121
use itertools::Itertools;
2222
use maplit::{convert_args, hashmap};
2323
use pgwire::pg_response::{PgResponse, StatementType};
24+
use risingwave_common::bail_not_implemented;
2425
use risingwave_common::catalog::{
2526
is_column_ids_dedup, ColumnCatalog, ColumnDesc, Schema, TableId, INITIAL_SOURCE_VERSION_ID,
2627
KAFKA_TIMESTAMP_COLUMN_NAME,
@@ -150,6 +151,15 @@ async fn extract_avro_table_schema(
150151
let conf = DebeziumAvroParserConfig::new(parser_config.encoding_config).await?;
151152
conf.map_to_columns()?
152153
} else {
154+
if let risingwave_connector::parser::EncodingProperties::Avro(avro_props) =
155+
&parser_config.encoding_config
156+
&& !avro_props.use_schema_registry
157+
&& !format_encode_options
158+
.get("with_deprecated_file_header")
159+
.is_some_and(|v| v == "true")
160+
{
161+
bail_not_implemented!(issue = 12871, "avro without schema registry");
162+
}
153163
let conf = AvroParserConfig::new(parser_config.encoding_config).await?;
154164
conf.map_to_columns()?
155165
};

0 commit comments

Comments
 (0)