Skip to content

Commit 45e5795

Browse files
committed
feat(sink): protobuf with schema registry
1 parent e27d13c commit 45e5795

File tree

4 files changed

+85
-8
lines changed

4 files changed

+85
-8
lines changed

src/connector/src/lib.rs

+1
Original file line numberDiff line numberDiff line change
@@ -14,6 +14,7 @@
1414

1515
#![expect(dead_code)]
1616
#![allow(clippy::derive_partial_eq_without_eq)]
17+
#![feature(array_chunks)]
1718
#![feature(coroutines)]
1819
#![feature(proc_macro_hygiene)]
1920
#![feature(stmt_expr_attributes)]

src/connector/src/sink/encoder/mod.rs

+1-1
Original file line numberDiff line numberDiff line change
@@ -27,7 +27,7 @@ pub mod template;
2727

2828
pub use avro::{AvroEncoder, AvroHeader};
2929
pub use json::JsonEncoder;
30-
pub use proto::ProtoEncoder;
30+
pub use proto::{ProtoEncoder, ProtoHeader};
3131

3232
/// Encode a row of a relation into
3333
/// * an object in json

src/connector/src/sink/encoder/proto.rs

+80-5
Original file line numberDiff line numberDiff line change
@@ -12,7 +12,7 @@
1212
// See the License for the specific language governing permissions and
1313
// limitations under the License.
1414

15-
use bytes::Bytes;
15+
use bytes::{BufMut, Bytes};
1616
use prost::Message;
1717
use prost_reflect::{
1818
DynamicMessage, FieldDescriptor, Kind, MessageDescriptor, ReflectMessage, Value,
@@ -30,13 +30,25 @@ pub struct ProtoEncoder {
3030
schema: Schema,
3131
col_indices: Option<Vec<usize>>,
3232
descriptor: MessageDescriptor,
33+
header: ProtoHeader,
34+
}
35+
36+
#[derive(Debug, Clone, Copy)]
37+
pub enum ProtoHeader {
38+
None,
39+
/// <https://docs.confluent.io/platform/7.5/schema-registry/fundamentals/serdes-develop/index.html#messages-wire-format>
40+
///
41+
/// * 00
42+
/// * 4-byte big-endian schema ID
43+
ConfluentSchemaRegistry(i32),
3344
}
3445

3546
impl ProtoEncoder {
3647
pub fn new(
3748
schema: Schema,
3849
col_indices: Option<Vec<usize>>,
3950
descriptor: MessageDescriptor,
51+
header: ProtoHeader,
4052
) -> SinkResult<Self> {
4153
match &col_indices {
4254
Some(col_indices) => validate_fields(
@@ -59,12 +71,18 @@ impl ProtoEncoder {
5971
schema,
6072
col_indices,
6173
descriptor,
74+
header,
6275
})
6376
}
6477
}
6578

79+
pub struct ProtoEncoded {
80+
message: DynamicMessage,
81+
header: ProtoHeader,
82+
}
83+
6684
impl RowEncoder for ProtoEncoder {
67-
type Output = DynamicMessage;
85+
type Output = ProtoEncoded;
6886

6987
fn schema(&self) -> &Schema {
7088
&self.schema
@@ -87,12 +105,68 @@ impl RowEncoder for ProtoEncoder {
87105
&self.descriptor,
88106
)
89107
.map_err(Into::into)
108+
.map(|m| ProtoEncoded {
109+
message: m,
110+
header: self.header,
111+
})
90112
}
91113
}
92114

93-
impl SerTo<Vec<u8>> for DynamicMessage {
115+
impl SerTo<Vec<u8>> for ProtoEncoded {
94116
fn ser_to(self) -> SinkResult<Vec<u8>> {
95-
Ok(self.encode_to_vec())
117+
let mut buf = Vec::new();
118+
match self.header {
119+
ProtoHeader::None => { /* noop */ }
120+
ProtoHeader::ConfluentSchemaRegistry(schema_id) => {
121+
buf.reserve(1 + 4);
122+
buf.put_u8(0);
123+
buf.put_i32(schema_id);
124+
MessageIndexes::from(self.message.descriptor()).encode(&mut buf);
125+
}
126+
}
127+
self.message.encode(&mut buf).unwrap();
128+
Ok(buf)
129+
}
130+
}
131+
132+
struct MessageIndexes(Vec<i32>);
133+
134+
impl MessageIndexes {
135+
fn from(desc: MessageDescriptor) -> Self {
136+
// https://github.com/protocolbuffers/protobuf/blob/v25.1/src/google/protobuf/descriptor.proto
137+
// https://docs.rs/prost-reflect/0.12.0/src/prost_reflect/descriptor/tag.rs.html
138+
// https://docs.rs/prost-reflect/0.12.0/src/prost_reflect/descriptor/build/visit.rs.html#125
139+
// `FileDescriptorProto` field #4 is `repeated DescriptorProto message_type`
140+
const TAG_FILE_MESSAGE: i32 = 4;
141+
// `DescriptorProto` field #3 is `repeated DescriptorProto nested_type`
142+
const TAG_MESSAGE_NESTED: i32 = 3;
143+
144+
let mut indexes = vec![];
145+
let mut path = desc.path().array_chunks();
146+
let &[tag, idx] = path.next().unwrap();
147+
assert_eq!(tag, TAG_FILE_MESSAGE);
148+
indexes.push(idx);
149+
for &[tag, idx] in path {
150+
assert_eq!(tag, TAG_MESSAGE_NESTED);
151+
indexes.push(idx);
152+
}
153+
Self(indexes)
154+
}
155+
156+
fn zig_i32(value: i32, buf: &mut impl BufMut) {
157+
let unsigned = ((value << 1) ^ (value >> 31)) as u32 as u64;
158+
prost::encoding::encode_varint(unsigned, buf);
159+
}
160+
161+
fn encode(&self, buf: &mut impl BufMut) {
162+
if self.0 == [0] {
163+
buf.put_u8(0);
164+
return;
165+
}
166+
Self::zig_i32(self.0.len().try_into().unwrap(), buf);
167+
for &idx in &self.0 {
168+
Self::zig_i32(idx, buf);
169+
}
96170
}
97171
}
98172

@@ -367,7 +441,8 @@ mod tests {
367441
Some(ScalarImpl::Timestamptz(Timestamptz::from_micros(3))),
368442
]);
369443

370-
let encoder = ProtoEncoder::new(schema, None, descriptor.clone()).unwrap();
444+
let encoder =
445+
ProtoEncoder::new(schema, None, descriptor.clone(), ProtoHeader::None).unwrap();
371446
let m = encoder.encode(row).unwrap();
372447
let encoded: Vec<u8> = m.ser_to().unwrap();
373448
assert_eq!(

src/connector/src/sink/formatter/mod.rs

+3-2
Original file line numberDiff line numberDiff line change
@@ -33,7 +33,7 @@ use super::encoder::{
3333
};
3434
use super::redis::{KEY_FORMAT, VALUE_FORMAT};
3535
use crate::sink::encoder::{
36-
AvroEncoder, AvroHeader, JsonEncoder, ProtoEncoder, TimestampHandlingMode,
36+
AvroEncoder, AvroHeader, JsonEncoder, ProtoEncoder, ProtoHeader, TimestampHandlingMode,
3737
};
3838

3939
/// Transforms a `StreamChunk` into a sequence of key-value pairs according a specific format,
@@ -127,7 +127,8 @@ impl SinkFormatterImpl {
127127
crate::schema::protobuf::fetch_descriptor(&format_desc.options, None)
128128
.await
129129
.map_err(|e| SinkError::Config(anyhow!(e)))?;
130-
let val_encoder = ProtoEncoder::new(schema, None, descriptor)?;
130+
let val_encoder =
131+
ProtoEncoder::new(schema, None, descriptor, ProtoHeader::None)?;
131132
let formatter = AppendOnlyFormatter::new(key_encoder, val_encoder);
132133
Ok(SinkFormatterImpl::AppendOnlyProto(formatter))
133134
}

0 commit comments

Comments
 (0)