Skip to content

Commit 69bde53

Browse files
authored
feat: HA for schema registry (risingwavelabs#11982)
Signed-off-by: tabVersion <[email protected]>
1 parent 477bfd4 commit 69bde53

File tree

7 files changed

+289
-144
lines changed

7 files changed

+289
-144
lines changed

e2e_test/schema_registry/pb.slt

Lines changed: 17 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -14,6 +14,19 @@ FORMAT plain ENCODE protobuf(
1414
message = 'test.User'
1515
);
1616

17+
# for multiple schema registry nodes
18+
statement ok
19+
create table sr_pb_test_bk with (
20+
connector = 'kafka',
21+
topic = 'sr_pb_test',
22+
properties.bootstrap.server = 'message_queue:29092',
23+
scan.startup.mode = 'earliest',
24+
message = 'test.User')
25+
FORMAT plain ENCODE protobuf(
26+
schema.registry = 'http://message_queue:8081,http://message_queue:8081',
27+
message = 'test.User'
28+
);
29+
1730
# Wait for source
1831
sleep 10s
1932

@@ -33,4 +46,7 @@ select min(id), max(id) from sr_pb_test;
3346

3447

3548
statement ok
36-
drop table sr_pb_test;
49+
drop table sr_pb_test;
50+
51+
statement ok
52+
drop table sr_pb_test_bk;

src/connector/src/parser/avro/parser.rs

Lines changed: 7 additions & 7 deletions
Original file line numberDiff line numberDiff line change
@@ -21,11 +21,12 @@ use risingwave_common::error::ErrorCode::{InternalError, ProtocolError};
2121
use risingwave_common::error::{Result, RwError};
2222
use risingwave_common::try_match_expand;
2323
use risingwave_pb::plan_common::ColumnDesc;
24-
use url::Url;
2524

2625
use super::schema_resolver::*;
2726
use super::util::avro_schema_to_column_descs;
28-
use crate::parser::schema_registry::{extract_schema_id, get_subject_by_strategy, Client};
27+
use crate::parser::schema_registry::{
28+
extract_schema_id, get_subject_by_strategy, handle_sr_list, Client,
29+
};
2930
use crate::parser::unified::avro::{AvroAccess, AvroParseOptions};
3031
use crate::parser::unified::AccessImpl;
3132
use crate::parser::util::{read_schema_from_http, read_schema_from_local, read_schema_from_s3};
@@ -115,9 +116,7 @@ impl AvroParserConfig {
115116
let avro_config = try_match_expand!(encoding_properties, EncodingProperties::Avro)?;
116117
let schema_location = &avro_config.row_schema_location;
117118
let enable_upsert = avro_config.enable_upsert;
118-
let url = Url::parse(schema_location).map_err(|e| {
119-
InternalError(format!("failed to parse url ({}): {}", schema_location, e))
120-
})?;
119+
let url = handle_sr_list(schema_location.as_str())?;
121120
if avro_config.use_schema_registry {
122121
let client = Client::new(url, &avro_config.client_config)?;
123122
let resolver = ConfluentSchemaResolver::new(client);
@@ -156,12 +155,13 @@ impl AvroParserConfig {
156155
"avro upsert without schema registry is not supported".to_string(),
157156
)));
158157
}
158+
let url = url.get(0).unwrap();
159159
let schema_content = match url.scheme() {
160160
"file" => read_schema_from_local(url.path()),
161161
"s3" => {
162-
read_schema_from_s3(&url, avro_config.aws_auth_props.as_ref().unwrap()).await
162+
read_schema_from_s3(url, avro_config.aws_auth_props.as_ref().unwrap()).await
163163
}
164-
"https" | "http" => read_schema_from_http(&url).await,
164+
"https" | "http" => read_schema_from_http(url).await,
165165
scheme => Err(RwError::from(ProtocolError(format!(
166166
"path scheme {} is not supported",
167167
scheme

src/connector/src/parser/debezium/avro_parser.rs

Lines changed: 3 additions & 6 deletions
Original file line numberDiff line numberDiff line change
@@ -17,15 +17,14 @@ use std::sync::Arc;
1717

1818
use apache_avro::types::Value;
1919
use apache_avro::{from_avro_datum, Schema};
20-
use reqwest::Url;
21-
use risingwave_common::error::ErrorCode::{InternalError, ProtocolError};
20+
use risingwave_common::error::ErrorCode::ProtocolError;
2221
use risingwave_common::error::{Result, RwError};
2322
use risingwave_common::try_match_expand;
2423
use risingwave_pb::plan_common::ColumnDesc;
2524

2625
use crate::parser::avro::schema_resolver::ConfluentSchemaResolver;
2726
use crate::parser::avro::util::avro_schema_to_column_descs;
28-
use crate::parser::schema_registry::{extract_schema_id, Client};
27+
use crate::parser::schema_registry::{extract_schema_id, handle_sr_list, Client};
2928
use crate::parser::unified::avro::{
3029
avro_extract_field_schema, avro_schema_skip_union, AvroAccess, AvroParseOptions,
3130
};
@@ -107,9 +106,7 @@ impl DebeziumAvroParserConfig {
107106
let schema_location = &avro_config.row_schema_location;
108107
let client_config = &avro_config.client_config;
109108
let kafka_topic = &avro_config.topic;
110-
let url = Url::parse(schema_location).map_err(|e| {
111-
InternalError(format!("failed to parse url ({}): {}", schema_location, e))
112-
})?;
109+
let url = handle_sr_list(schema_location)?;
113110
let client = Client::new(url, client_config)?;
114111
let raw_schema = client
115112
.get_schema_by_subject(format!("{}-key", &kafka_topic).as_str())

src/connector/src/parser/json_parser.rs

Lines changed: 4 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -20,13 +20,13 @@ use risingwave_common::error::ErrorCode::{self, InternalError, ProtocolError};
2020
use risingwave_common::error::{Result, RwError};
2121
use risingwave_common::try_match_expand;
2222
use risingwave_pb::plan_common::ColumnDesc;
23-
use url::Url;
2423

2524
use super::avro::schema_resolver::ConfluentSchemaResolver;
2625
use super::schema_registry::Client;
2726
use super::util::{get_kafka_topic, read_schema_from_http, read_schema_from_local};
2827
use super::{EncodingProperties, SchemaRegistryAuth, SpecificParserConfig};
2928
use crate::parser::avro::util::avro_schema_to_column_descs;
29+
use crate::parser::schema_registry::handle_sr_list;
3030
use crate::parser::unified::json::{JsonAccess, JsonParseOptions};
3131
use crate::parser::unified::util::apply_row_accessor_on_stream_chunk_writer;
3232
use crate::parser::unified::AccessImpl;
@@ -151,8 +151,7 @@ pub async fn schema_to_columns(
151151
use_schema_registry: bool,
152152
props: &HashMap<String, String>,
153153
) -> anyhow::Result<Vec<ColumnDesc>> {
154-
let url = Url::parse(schema_location)
155-
.map_err(|e| InternalError(format!("failed to parse url ({}): {}", schema_location, e)))?;
154+
let url = handle_sr_list(schema_location)?;
156155
let schema_content = if use_schema_registry {
157156
let schema_registry_auth = SchemaRegistryAuth::from(props);
158157
let client = Client::new(url, &schema_registry_auth)?;
@@ -163,9 +162,10 @@ pub async fn schema_to_columns(
163162
.await?
164163
.content
165164
} else {
165+
let url = url.get(0).unwrap();
166166
match url.scheme() {
167167
"file" => read_schema_from_local(url.path()),
168-
"https" | "http" => read_schema_from_http(&url).await,
168+
"https" | "http" => read_schema_from_http(url).await,
169169
scheme => Err(RwError::from(ProtocolError(format!(
170170
"path scheme {} is not supported",
171171
scheme

src/connector/src/parser/protobuf/parser.rs

Lines changed: 7 additions & 6 deletions
Original file line numberDiff line numberDiff line change
@@ -25,11 +25,12 @@ use risingwave_common::error::{Result, RwError};
2525
use risingwave_common::try_match_expand;
2626
use risingwave_common::types::{DataType, Datum, Decimal, ScalarImpl, F32, F64};
2727
use risingwave_pb::plan_common::ColumnDesc;
28-
use url::Url;
2928

3029
use super::schema_resolver::*;
3130
use crate::aws_utils::load_file_descriptor_from_s3;
32-
use crate::parser::schema_registry::{extract_schema_id, get_subject_by_strategy, Client};
31+
use crate::parser::schema_registry::{
32+
extract_schema_id, get_subject_by_strategy, handle_sr_list, Client,
33+
};
3334
use crate::parser::unified::protobuf::ProtobufAccess;
3435
use crate::parser::unified::AccessImpl;
3536
use crate::parser::{AccessBuilder, EncodingProperties};
@@ -80,8 +81,7 @@ impl ProtobufParserConfig {
8081
let protobuf_config = try_match_expand!(encoding_properties, EncodingProperties::Protobuf)?;
8182
let location = &protobuf_config.row_schema_location;
8283
let message_name = &protobuf_config.message_name;
83-
let url = Url::parse(location)
84-
.map_err(|e| InternalError(format!("failed to parse url ({}): {}", location, e)))?;
84+
let url = handle_sr_list(location.as_str())?;
8585

8686
let schema_bytes = if protobuf_config.use_schema_registry {
8787
let (schema_key, schema_value) = get_subject_by_strategy(
@@ -99,6 +99,7 @@ impl ProtobufParserConfig {
9999
let client = Client::new(url, &protobuf_config.client_config)?;
100100
compile_file_descriptor_from_schema_registry(schema_value.as_str(), &client).await?
101101
} else {
102+
let url = url.get(0).unwrap();
102103
match url.scheme() {
103104
// TODO(Tao): support local file only when it's compiled in debug mode.
104105
"file" => {
@@ -115,12 +116,12 @@ impl ProtobufParserConfig {
115116
}
116117
"s3" => {
117118
load_file_descriptor_from_s3(
118-
&url,
119+
url,
119120
protobuf_config.aws_auth_props.as_ref().unwrap(),
120121
)
121122
.await
122123
}
123-
"https" | "http" => load_file_descriptor_from_http(&url).await,
124+
"https" | "http" => load_file_descriptor_from_http(url).await,
124125
scheme => Err(RwError::from(ProtocolError(format!(
125126
"path scheme {} is not supported",
126127
scheme

0 commit comments

Comments
 (0)