Skip to content

Commit 63bddef

Browse files
committed
load protobuf from schema registry
1 parent 45e5795 commit 63bddef

File tree

2 files changed

+54
-14
lines changed

2 files changed

+54
-14
lines changed

src/connector/src/schema/protobuf.rs

+42-8
Original file line numberDiff line numberDiff line change
@@ -17,28 +17,46 @@ use std::collections::BTreeMap;
1717
use itertools::Itertools as _;
1818
use prost_reflect::{DescriptorPool, FileDescriptor, MessageDescriptor};
1919

20-
use super::loader::LoadedSchema;
20+
use super::loader::{LoadedSchema, SchemaLoader};
2121
use super::schema_registry::Subject;
2222
use super::{
2323
invalid_option_error, InvalidOptionError, SchemaFetchError, MESSAGE_NAME_KEY,
24-
SCHEMA_LOCATION_KEY,
24+
SCHEMA_LOCATION_KEY, SCHEMA_REGISTRY_KEY,
2525
};
2626
use crate::common::AwsAuthProps;
2727
use crate::parser::{EncodingProperties, ProtobufParserConfig, ProtobufProperties};
2828

2929
/// `aws_auth_props` is only required when reading `s3://` URL.
3030
pub async fn fetch_descriptor(
3131
format_options: &BTreeMap<String, String>,
32+
topic: &str,
3233
aws_auth_props: Option<&AwsAuthProps>,
33-
) -> Result<MessageDescriptor, SchemaFetchError> {
34-
let row_schema_location = format_options
35-
.get(SCHEMA_LOCATION_KEY)
36-
.ok_or_else(|| invalid_option_error!("{SCHEMA_LOCATION_KEY} required"))?
37-
.clone();
34+
) -> Result<(MessageDescriptor, Option<i32>), SchemaFetchError> {
3835
let message_name = format_options
3936
.get(MESSAGE_NAME_KEY)
4037
.ok_or_else(|| invalid_option_error!("{MESSAGE_NAME_KEY} required"))?
4138
.clone();
39+
let schema_location = format_options.get(SCHEMA_LOCATION_KEY);
40+
let schema_registry = format_options.get(SCHEMA_REGISTRY_KEY);
41+
let row_schema_location = match (schema_location, schema_registry) {
42+
(Some(_), Some(_)) => {
43+
return Err(invalid_option_error!(
44+
"cannot use {SCHEMA_LOCATION_KEY} and {SCHEMA_REGISTRY_KEY} together"
45+
)
46+
.into())
47+
}
48+
(None, None) => {
49+
return Err(invalid_option_error!(
50+
"requires one of {SCHEMA_LOCATION_KEY} or {SCHEMA_REGISTRY_KEY}"
51+
)
52+
.into())
53+
}
54+
(None, Some(_)) => {
55+
let (md, sid) = fetch_from_registry(&message_name, format_options, topic).await?;
56+
return Ok((md, Some(sid)));
57+
}
58+
(Some(url), None) => url.clone(),
59+
};
4260

4361
if row_schema_location.starts_with("s3") && aws_auth_props.is_none() {
4462
return Err(invalid_option_error!("s3 URL not supported yet").into());
@@ -59,7 +77,23 @@ pub async fn fetch_descriptor(
5977
let conf = ProtobufParserConfig::new(enc)
6078
.await
6179
.map_err(SchemaFetchError::YetToMigrate)?;
62-
Ok(conf.message_descriptor)
80+
Ok((conf.message_descriptor, None))
81+
}
82+
83+
pub async fn fetch_from_registry(
84+
message_name: &str,
85+
format_options: &BTreeMap<String, String>,
86+
topic: &str,
87+
) -> Result<(MessageDescriptor, i32), SchemaFetchError> {
88+
let loader = SchemaLoader::from_format_options(topic, format_options)?;
89+
90+
let (vid, vpb) = loader.load_val_schema::<FileDescriptor>().await?;
91+
92+
// TODO: why file rather than pool or message?
93+
Ok((
94+
vpb.parent_pool().get_message_by_name(message_name).unwrap(),
95+
vid,
96+
))
6397
}
6498

6599
impl LoadedSchema for FileDescriptor {

src/connector/src/sink/formatter/mod.rs

+12-6
Original file line numberDiff line numberDiff line change
@@ -123,12 +123,18 @@ impl SinkFormatterImpl {
123123
}
124124
SinkEncode::Protobuf => {
125125
// By passing `None` as `aws_auth_props`, reading from `s3://` not supported yet.
126-
let descriptor =
127-
crate::schema::protobuf::fetch_descriptor(&format_desc.options, None)
128-
.await
129-
.map_err(|e| SinkError::Config(anyhow!(e)))?;
130-
let val_encoder =
131-
ProtoEncoder::new(schema, None, descriptor, ProtoHeader::None)?;
126+
let (descriptor, sid) = crate::schema::protobuf::fetch_descriptor(
127+
&format_desc.options,
128+
topic,
129+
None,
130+
)
131+
.await
132+
.map_err(|e| SinkError::Config(anyhow!(e)))?;
133+
let header = match sid {
134+
None => ProtoHeader::None,
135+
Some(sid) => ProtoHeader::ConfluentSchemaRegistry(sid),
136+
};
137+
let val_encoder = ProtoEncoder::new(schema, None, descriptor, header)?;
132138
let formatter = AppendOnlyFormatter::new(key_encoder, val_encoder);
133139
Ok(SinkFormatterImpl::AppendOnlyProto(formatter))
134140
}

0 commit comments

Comments
 (0)