Skip to content

feat(sink): support encode protobuf with confluent schema registry #15546

New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Merged
merged 4 commits into from
Mar 13, 2024
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
10 changes: 9 additions & 1 deletion ci/scripts/e2e-kafka-sink-test.sh
Original file line number Diff line number Diff line change
Expand Up @@ -143,14 +143,22 @@ sqllogictest -p 4566 -d dev 'e2e_test/sink/kafka/drop_sink.slt'
./.risingwave/bin/kafka/bin/kafka-topics.sh --bootstrap-server message_queue:29092 --topic test-rw-sink-debezium --delete > /dev/null 2>&1

# test different encoding
echo "preparing confluent schema registry"
python3 -m pip install requests confluent-kafka

echo "testing protobuf"
cp src/connector/src/test_data/proto_recursive/recursive.pb ./proto-recursive
./.risingwave/bin/kafka/bin/kafka-topics.sh --bootstrap-server message_queue:29092 --topic test-rw-sink-append-only-protobuf --create > /dev/null 2>&1
./.risingwave/bin/kafka/bin/kafka-topics.sh --bootstrap-server message_queue:29092 --topic test-rw-sink-append-only-protobuf-csr-a --create > /dev/null 2>&1
./.risingwave/bin/kafka/bin/kafka-topics.sh --bootstrap-server message_queue:29092 --topic test-rw-sink-append-only-protobuf-csr-hi --create > /dev/null 2>&1
python3 e2e_test/sink/kafka/register_schema.py 'http://message_queue:8081' 'test-rw-sink-append-only-protobuf-csr-a-value' src/connector/src/test_data/test-index-array.proto
python3 e2e_test/sink/kafka/register_schema.py 'http://message_queue:8081' 'test-rw-sink-append-only-protobuf-csr-hi-value' src/connector/src/test_data/test-index-array.proto
sqllogictest -p 4566 -d dev 'e2e_test/sink/kafka/protobuf.slt'
./.risingwave/bin/kafka/bin/kafka-topics.sh --bootstrap-server message_queue:29092 --topic test-rw-sink-append-only-protobuf-csr-hi --delete > /dev/null 2>&1
./.risingwave/bin/kafka/bin/kafka-topics.sh --bootstrap-server message_queue:29092 --topic test-rw-sink-append-only-protobuf-csr-a --delete > /dev/null 2>&1
./.risingwave/bin/kafka/bin/kafka-topics.sh --bootstrap-server message_queue:29092 --topic test-rw-sink-append-only-protobuf --delete > /dev/null 2>&1

echo "testing avro"
python3 -m pip install requests confluent-kafka
python3 e2e_test/sink/kafka/register_schema.py 'http://message_queue:8081' 'test-rw-sink-upsert-avro-value' src/connector/src/test_data/all-types.avsc
python3 e2e_test/sink/kafka/register_schema.py 'http://message_queue:8081' 'test-rw-sink-upsert-avro-key' src/connector/src/test_data/all-types.avsc 'string_field,int32_field'
./.risingwave/bin/kafka/bin/kafka-topics.sh --bootstrap-server message_queue:29092 --topic test-rw-sink-upsert-avro --create > /dev/null 2>&1
Expand Down
56 changes: 56 additions & 0 deletions e2e_test/sink/kafka/protobuf.slt
Original file line number Diff line number Diff line change
Expand Up @@ -7,6 +7,24 @@ format plain encode protobuf (
schema.location = 'file:///risingwave/proto-recursive',
message = 'recursive.AllTypes');

statement ok
create table from_kafka_csr_trivial with (
connector = 'kafka',
topic = 'test-rw-sink-append-only-protobuf-csr-a',
properties.bootstrap.server = 'message_queue:29092')
format plain encode protobuf (
schema.registry = 'http://message_queue:8081',
message = 'test.package.MessageA');

statement ok
create table from_kafka_csr_nested with (
connector = 'kafka',
topic = 'test-rw-sink-append-only-protobuf-csr-hi',
properties.bootstrap.server = 'message_queue:29092')
format plain encode protobuf (
schema.registry = 'http://message_queue:8081',
message = 'test.package.MessageH.MessageI');

statement ok
create table into_kafka (
bool_field bool,
Expand Down Expand Up @@ -43,6 +61,26 @@ format plain encode protobuf (
schema.location = 'file:///risingwave/proto-recursive',
message = 'recursive.AllTypes');

statement ok
create sink sink_csr_trivial as select string_field as field_a from into_kafka with (
connector = 'kafka',
topic = 'test-rw-sink-append-only-protobuf-csr-a',
properties.bootstrap.server = 'message_queue:29092')
format plain encode protobuf (
force_append_only = true,
schema.registry = 'http://message_queue:8081',
message = 'test.package.MessageA');

statement ok
create sink sink_csr_nested as select sint32_field as field_i from into_kafka with (
connector = 'kafka',
topic = 'test-rw-sink-append-only-protobuf-csr-hi',
properties.bootstrap.server = 'message_queue:29092')
format plain encode protobuf (
force_append_only = true,
schema.registry = 'http://message_queue:8081',
message = 'test.package.MessageH.MessageI');

sleep 2s

query TTTRRIIIIIITTTI
Expand All @@ -66,6 +104,18 @@ select
t Rising \x6130 3.5 4.25 22 23 24 0 26 27 (1,"") {4,0,4} (1136239445,0) 42
f Wave \x5a4446 1.5 0 11 12 13 14 15 16 (4,foo) {} (0,0) 0

query T
select field_a from from_kafka_csr_trivial order by 1;
----
Rising
Wave

query I
select field_i from from_kafka_csr_nested order by 1;
----
13
24

statement error No such file
create sink sink_err from into_kafka with (
connector = 'kafka',
Expand Down Expand Up @@ -96,6 +146,12 @@ format plain encode protobuf (
schema.location = 's3:///risingwave/proto-recursive',
message = 'recursive.AllTypes');

statement ok
drop sink sink_csr_nested;

statement ok
drop sink sink_csr_trivial;

statement ok
drop sink sink0;

Expand Down
16 changes: 11 additions & 5 deletions e2e_test/sink/kafka/register_schema.py
Original file line number Diff line number Diff line change
Expand Up @@ -5,7 +5,8 @@
def main():
url = sys.argv[1]
subject = sys.argv[2]
with open(sys.argv[3]) as f:
local_path = sys.argv[3]
with open(local_path) as f:
schema_str = f.read()
if 4 < len(sys.argv):
keys = sys.argv[4].split(',')
Expand All @@ -14,11 +15,16 @@ def main():

client = SchemaRegistryClient({"url": url})

if keys:
schema_str = select_keys(schema_str, keys)
if local_path.endswith('.avsc'):
if keys:
schema_str = select_keys(schema_str, keys)
else:
schema_str = remove_unsupported(schema_str)
schema = Schema(schema_str, 'AVRO')
elif local_path.endswith('.proto'):
schema = Schema(schema_str, 'PROTOBUF')
else:
schema_str = remove_unsupported(schema_str)
schema = Schema(schema_str, 'AVRO')
raise ValueError('{} shall end with .avsc or .proto'.format(local_path))
client.register_schema(subject, schema)


Expand Down
1 change: 1 addition & 0 deletions src/connector/src/lib.rs
Original file line number Diff line number Diff line change
Expand Up @@ -14,6 +14,7 @@

#![expect(dead_code)]
#![allow(clippy::derive_partial_eq_without_eq)]
#![feature(array_chunks)]
#![feature(coroutines)]
#![feature(proc_macro_hygiene)]
#![feature(stmt_expr_attributes)]
Expand Down
49 changes: 41 additions & 8 deletions src/connector/src/schema/protobuf.rs
Original file line number Diff line number Diff line change
Expand Up @@ -17,28 +17,46 @@ use std::collections::BTreeMap;
use itertools::Itertools as _;
use prost_reflect::{DescriptorPool, FileDescriptor, MessageDescriptor};

use super::loader::LoadedSchema;
use super::loader::{LoadedSchema, SchemaLoader};
use super::schema_registry::Subject;
use super::{
invalid_option_error, InvalidOptionError, SchemaFetchError, MESSAGE_NAME_KEY,
SCHEMA_LOCATION_KEY,
SCHEMA_LOCATION_KEY, SCHEMA_REGISTRY_KEY,
};
use crate::common::AwsAuthProps;
use crate::parser::{EncodingProperties, ProtobufParserConfig, ProtobufProperties};

/// `aws_auth_props` is only required when reading `s3://` URL.
pub async fn fetch_descriptor(
format_options: &BTreeMap<String, String>,
topic: &str,
aws_auth_props: Option<&AwsAuthProps>,
) -> Result<MessageDescriptor, SchemaFetchError> {
let row_schema_location = format_options
.get(SCHEMA_LOCATION_KEY)
.ok_or_else(|| invalid_option_error!("{SCHEMA_LOCATION_KEY} required"))?
.clone();
) -> Result<(MessageDescriptor, Option<i32>), SchemaFetchError> {
let message_name = format_options
.get(MESSAGE_NAME_KEY)
.ok_or_else(|| invalid_option_error!("{MESSAGE_NAME_KEY} required"))?
.clone();
let schema_location = format_options.get(SCHEMA_LOCATION_KEY);
let schema_registry = format_options.get(SCHEMA_REGISTRY_KEY);
Comment on lines +39 to +40
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

do we also need to handle the auth for schema registry here?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

It is handled inside the unified SchemaLoader (#14642)

let client_config = format_options.into();

But yes our e2e tests use redpanda without password, and cannot catch bugs on auth (e.g. #14755).

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Seems to be an involved process. Leaving it as a separate issue.
https://docs.redpanda.com/23.2/manage/security/authentication/#configure-basic-authentication

let row_schema_location = match (schema_location, schema_registry) {
(Some(_), Some(_)) => {
return Err(invalid_option_error!(
"cannot use {SCHEMA_LOCATION_KEY} and {SCHEMA_REGISTRY_KEY} together"
)
.into())
}
(None, None) => {
return Err(invalid_option_error!(
"requires one of {SCHEMA_LOCATION_KEY} or {SCHEMA_REGISTRY_KEY}"
)
.into())
}
(None, Some(_)) => {
let (md, sid) = fetch_from_registry(&message_name, format_options, topic).await?;
return Ok((md, Some(sid)));
}
(Some(url), None) => url.clone(),
};

if row_schema_location.starts_with("s3") && aws_auth_props.is_none() {
return Err(invalid_option_error!("s3 URL not supported yet").into());
Expand All @@ -59,7 +77,22 @@ pub async fn fetch_descriptor(
let conf = ProtobufParserConfig::new(enc)
.await
.map_err(SchemaFetchError::YetToMigrate)?;
Ok(conf.message_descriptor)
Ok((conf.message_descriptor, None))
}

pub async fn fetch_from_registry(
message_name: &str,
format_options: &BTreeMap<String, String>,
topic: &str,
) -> Result<(MessageDescriptor, i32), SchemaFetchError> {
let loader = SchemaLoader::from_format_options(topic, format_options)?;

let (vid, vpb) = loader.load_val_schema::<FileDescriptor>().await?;

Ok((
vpb.parent_pool().get_message_by_name(message_name).unwrap(),
vid,
))
}

impl LoadedSchema for FileDescriptor {
Expand Down
2 changes: 1 addition & 1 deletion src/connector/src/sink/encoder/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -27,7 +27,7 @@ pub mod template;

pub use avro::{AvroEncoder, AvroHeader};
pub use json::JsonEncoder;
pub use proto::ProtoEncoder;
pub use proto::{ProtoEncoder, ProtoHeader};

/// Encode a row of a relation into
/// * an object in json
Expand Down
85 changes: 80 additions & 5 deletions src/connector/src/sink/encoder/proto.rs
Original file line number Diff line number Diff line change
Expand Up @@ -12,7 +12,7 @@
// See the License for the specific language governing permissions and
// limitations under the License.

use bytes::Bytes;
use bytes::{BufMut, Bytes};
use prost::Message;
use prost_reflect::{
DynamicMessage, FieldDescriptor, Kind, MessageDescriptor, ReflectMessage, Value,
Expand All @@ -30,13 +30,25 @@ pub struct ProtoEncoder {
schema: Schema,
col_indices: Option<Vec<usize>>,
descriptor: MessageDescriptor,
header: ProtoHeader,
}

#[derive(Debug, Clone, Copy)]
pub enum ProtoHeader {
None,
/// <https://docs.confluent.io/platform/7.5/schema-registry/fundamentals/serdes-develop/index.html#messages-wire-format>
///
/// * 00
/// * 4-byte big-endian schema ID
ConfluentSchemaRegistry(i32),
}

impl ProtoEncoder {
pub fn new(
schema: Schema,
col_indices: Option<Vec<usize>>,
descriptor: MessageDescriptor,
header: ProtoHeader,
) -> SinkResult<Self> {
match &col_indices {
Some(col_indices) => validate_fields(
Expand All @@ -59,12 +71,18 @@ impl ProtoEncoder {
schema,
col_indices,
descriptor,
header,
})
}
}

pub struct ProtoEncoded {
message: DynamicMessage,
header: ProtoHeader,
}

impl RowEncoder for ProtoEncoder {
type Output = DynamicMessage;
type Output = ProtoEncoded;

fn schema(&self) -> &Schema {
&self.schema
Expand All @@ -87,12 +105,68 @@ impl RowEncoder for ProtoEncoder {
&self.descriptor,
)
.map_err(Into::into)
.map(|m| ProtoEncoded {
message: m,
header: self.header,
})
}
}

impl SerTo<Vec<u8>> for DynamicMessage {
impl SerTo<Vec<u8>> for ProtoEncoded {
fn ser_to(self) -> SinkResult<Vec<u8>> {
Ok(self.encode_to_vec())
let mut buf = Vec::new();
match self.header {
ProtoHeader::None => { /* noop */ }
ProtoHeader::ConfluentSchemaRegistry(schema_id) => {
buf.reserve(1 + 4);
buf.put_u8(0);
buf.put_i32(schema_id);
MessageIndexes::from(self.message.descriptor()).encode(&mut buf);
Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This can be pre-computed and is same for all messages.

}
}
self.message.encode(&mut buf).unwrap();
Ok(buf)
}
}

struct MessageIndexes(Vec<i32>);

impl MessageIndexes {
fn from(desc: MessageDescriptor) -> Self {
// https://github.com/protocolbuffers/protobuf/blob/v25.1/src/google/protobuf/descriptor.proto
// https://docs.rs/prost-reflect/0.12.0/src/prost_reflect/descriptor/tag.rs.html
// https://docs.rs/prost-reflect/0.12.0/src/prost_reflect/descriptor/build/visit.rs.html#125
// `FileDescriptorProto` field #4 is `repeated DescriptorProto message_type`
const TAG_FILE_MESSAGE: i32 = 4;
// `DescriptorProto` field #3 is `repeated DescriptorProto nested_type`
const TAG_MESSAGE_NESTED: i32 = 3;

let mut indexes = vec![];
let mut path = desc.path().array_chunks();
let &[tag, idx] = path.next().unwrap();
assert_eq!(tag, TAG_FILE_MESSAGE);
indexes.push(idx);
for &[tag, idx] in path {
assert_eq!(tag, TAG_MESSAGE_NESTED);
indexes.push(idx);
}
Self(indexes)
}

fn zig_i32(value: i32, buf: &mut impl BufMut) {
let unsigned = ((value << 1) ^ (value >> 31)) as u32 as u64;
prost::encoding::encode_varint(unsigned, buf);
}

fn encode(&self, buf: &mut impl BufMut) {
if self.0 == [0] {
buf.put_u8(0);
return;
}
Self::zig_i32(self.0.len().try_into().unwrap(), buf);
for &idx in &self.0 {
Self::zig_i32(idx, buf);
}
}
}

Expand Down Expand Up @@ -367,7 +441,8 @@ mod tests {
Some(ScalarImpl::Timestamptz(Timestamptz::from_micros(3))),
]);

let encoder = ProtoEncoder::new(schema, None, descriptor.clone()).unwrap();
let encoder =
ProtoEncoder::new(schema, None, descriptor.clone(), ProtoHeader::None).unwrap();
let m = encoder.encode(row).unwrap();
let encoded: Vec<u8> = m.ser_to().unwrap();
assert_eq!(
Expand Down
Loading