Skip to content

Commit cf0940c

Browse files
authored
refactor(connector): avoid using macro in parser (#10120)
Signed-off-by: Bugen Zhao <[email protected]>
1 parent 572780b commit cf0940c

File tree

19 files changed

+353
-236
lines changed

19 files changed

+353
-236
lines changed

Cargo.lock

Lines changed: 1 addition & 0 deletions
Some generated files are not rendered by default. Learn more about customizing how changed files appear on GitHub.

src/batch/src/lib.rs

Lines changed: 0 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -12,7 +12,6 @@
1212
// See the License for the specific language governing permissions and
1313
// limitations under the License.
1414

15-
#![allow(incomplete_features)]
1615
#![expect(dead_code)]
1716
#![allow(clippy::derive_partial_eq_without_eq)]
1817
#![feature(trait_alias)]

src/connector/Cargo.toml

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -22,6 +22,7 @@ apache-avro = { git = "https://github.com/risingwavelabs/avro", branch = "waruto
2222
"xz",
2323
] }
2424
async-trait = "0.1"
25+
auto_enums = { version = "0.8", features = ["futures03"] }
2526
aws-config = { workspace = true }
2627
aws-sdk-ec2 = { workspace = true }
2728
aws-sdk-kinesis = { workspace = true }

src/connector/src/lib.rs

Lines changed: 2 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -26,6 +26,8 @@
2626
#![feature(let_chains)]
2727
#![feature(box_into_inner)]
2828
#![feature(type_alias_impl_trait)]
29+
#![feature(return_position_impl_trait_in_trait)]
30+
#![feature(async_fn_in_trait)]
2931

3032
use std::time::Duration;
3133

src/connector/src/macros.rs

Lines changed: 0 additions & 74 deletions
Original file line numberDiff line numberDiff line change
@@ -180,86 +180,12 @@ macro_rules! impl_connector_properties {
180180
}
181181
}
182182

183-
#[macro_export]
184-
macro_rules! impl_common_parser_logic {
185-
($parser_name:ty) => {
186-
impl $parser_name {
187-
#[allow(unused_mut)]
188-
#[try_stream(boxed, ok = $crate::source::StreamChunkWithState, error = RwError)]
189-
async fn into_chunk_stream(mut self, data_stream: $crate::source::BoxSourceStream) {
190-
#[for_await]
191-
for batch in data_stream {
192-
let batch = batch?;
193-
let mut builder =
194-
$crate::parser::SourceStreamChunkBuilder::with_capacity(self.rw_columns.clone(), batch.len());
195-
let mut split_offset_mapping: std::collections::HashMap<$crate::source::SplitId, String> = std::collections::HashMap::new();
196-
197-
for msg in batch {
198-
if let Some(content) = msg.payload {
199-
split_offset_mapping.insert(msg.split_id, msg.offset);
200-
201-
let old_op_num = builder.op_num();
202-
203-
if let Err(e) = self.parse_inner(content, builder.row_writer())
204-
.await
205-
{
206-
tracing::warn!("message parsing failed {}, skipping", e.to_string());
207-
// This will throw an error for batch
208-
self.source_ctx.report_user_source_error(e)?;
209-
continue;
210-
}
211-
212-
let new_op_num = builder.op_num();
213-
214-
// new_op_num - old_op_num is the number of rows added to the builder
215-
for _ in old_op_num..new_op_num {
216-
// TODO: support more kinds of SourceMeta
217-
if let $crate::source::SourceMeta::Kafka(kafka_meta) = msg.meta.clone() {
218-
let f = |desc: &SourceColumnDesc| -> Option<risingwave_common::types::Datum> {
219-
if !desc.is_meta {
220-
return None;
221-
}
222-
match desc.name.as_str() {
223-
"_rw_kafka_timestamp" => Some(
224-
kafka_meta
225-
.timestamp
226-
.map(|ts| risingwave_common::cast::i64_to_timestamptz(ts).unwrap().into()),
227-
),
228-
_ => unreachable!(
229-
"kafka will not have this meta column: {}",
230-
desc.name
231-
),
232-
}
233-
};
234-
builder.row_writer().fulfill_meta_column(f)?;
235-
}
236-
}
237-
}
238-
}
239-
yield $crate::source::StreamChunkWithState {
240-
chunk: builder.finish(),
241-
split_offset_mapping: Some(split_offset_mapping),
242-
};
243-
}
244-
}
245-
}
246-
247-
impl $crate::parser::ByteStreamSourceParser for $parser_name {
248-
fn into_stream(self, data_stream: $crate::source::BoxSourceStream) -> $crate::source::BoxSourceWithStateStream {
249-
self.into_chunk_stream(data_stream)
250-
}
251-
}
252-
253-
}
254-
}
255-
256183
#[macro_export]
257184
macro_rules! impl_common_split_reader_logic {
258185
($reader:ty, $props:ty) => {
259186
impl $reader {
260187
#[try_stream(boxed, ok = $crate::source::StreamChunkWithState, error = risingwave_common::error::RwError)]
261188
pub(crate) async fn into_chunk_stream(self) {
262-
use $crate::parser::ByteStreamSourceParser;
263189
let parser_config = self.parser_config.clone();
264190
let actor_id = self.source_ctx.source_info.actor_id.to_string();
265191
let source_id = self.source_ctx.source_info.source_id.to_string();

src/connector/src/parser/avro/parser.rs

Lines changed: 20 additions & 6 deletions
Original file line numberDiff line numberDiff line change
@@ -19,7 +19,6 @@ use std::sync::Arc;
1919

2020
use apache_avro::types::Value;
2121
use apache_avro::{from_avro_datum, Reader, Schema};
22-
use futures_async_stream::try_stream;
2322
use risingwave_common::error::ErrorCode::{InternalError, ProtocolError};
2423
use risingwave_common::error::{Result, RwError};
2524
use risingwave_pb::plan_common::ColumnDesc;
@@ -28,14 +27,11 @@ use url::Url;
2827
use super::schema_resolver::*;
2928
use super::util::{extract_inner_field_schema, from_avro_value};
3029
use crate::common::UpsertMessage;
31-
use crate::impl_common_parser_logic;
3230
use crate::parser::avro::util::avro_field_to_column_desc;
3331
use crate::parser::schema_registry::{extract_schema_id, Client};
3432
use crate::parser::util::get_kafka_topic;
35-
use crate::parser::{SourceStreamChunkRowWriter, WriteGuard};
36-
use crate::source::{SourceColumnDesc, SourceContextRef};
37-
38-
impl_common_parser_logic!(AvroParser);
33+
use crate::parser::{ByteStreamSourceParser, SourceStreamChunkRowWriter, WriteGuard};
34+
use crate::source::{SourceColumnDesc, SourceContext, SourceContextRef};
3935

4036
#[derive(Debug)]
4137
pub struct AvroParser {
@@ -335,6 +331,24 @@ impl AvroParser {
335331
}
336332
}
337333

334+
impl ByteStreamSourceParser for AvroParser {
335+
fn columns(&self) -> &[SourceColumnDesc] {
336+
&self.rw_columns
337+
}
338+
339+
fn source_ctx(&self) -> &SourceContext {
340+
&self.source_ctx
341+
}
342+
343+
async fn parse_one<'a>(
344+
&'a mut self,
345+
payload: Vec<u8>,
346+
writer: SourceStreamChunkRowWriter<'a>,
347+
) -> Result<WriteGuard> {
348+
self.parse_inner(payload, writer).await
349+
}
350+
}
351+
338352
#[cfg(test)]
339353
mod test {
340354
use std::collections::HashMap;

src/connector/src/parser/canal/simd_json_parser.rs

Lines changed: 20 additions & 5 deletions
Original file line numberDiff line numberDiff line change
@@ -13,26 +13,23 @@
1313
// limitations under the License.
1414

1515
use anyhow::anyhow;
16-
use futures_async_stream::try_stream;
1716
use risingwave_common::error::ErrorCode::ProtocolError;
1817
use risingwave_common::error::{Result, RwError};
1918
use risingwave_common::types::{DataType, Datum};
2019
use risingwave_common::util::iter_util::ZipEqFast;
2120
use simd_json::{BorrowedValue, StaticNode, ValueAccess};
2221

23-
use crate::impl_common_parser_logic;
2422
use crate::parser::canal::operators::*;
2523
use crate::parser::common::{do_parse_simd_json_value, json_object_smart_get_value};
2624
use crate::parser::util::at_least_one_ok;
27-
use crate::parser::{SourceStreamChunkRowWriter, WriteGuard};
28-
use crate::source::{SourceColumnDesc, SourceContextRef, SourceFormat};
25+
use crate::parser::{ByteStreamSourceParser, SourceStreamChunkRowWriter, WriteGuard};
26+
use crate::source::{SourceColumnDesc, SourceContext, SourceContextRef, SourceFormat};
2927

3028
const AFTER: &str = "data";
3129
const BEFORE: &str = "old";
3230
const OP: &str = "type";
3331
const IS_DDL: &str = "isDdl";
3432

35-
impl_common_parser_logic!(CanalJsonParser);
3633
#[derive(Debug)]
3734
pub struct CanalJsonParser {
3835
pub(crate) rw_columns: Vec<SourceColumnDesc>,
@@ -197,6 +194,24 @@ fn cannal_simd_json_parse_value(
197194
}
198195
}
199196

197+
impl ByteStreamSourceParser for CanalJsonParser {
198+
fn columns(&self) -> &[SourceColumnDesc] {
199+
&self.rw_columns
200+
}
201+
202+
fn source_ctx(&self) -> &SourceContext {
203+
&self.source_ctx
204+
}
205+
206+
async fn parse_one<'a>(
207+
&'a mut self,
208+
payload: Vec<u8>,
209+
writer: SourceStreamChunkRowWriter<'a>,
210+
) -> Result<WriteGuard> {
211+
self.parse_inner(payload, writer).await
212+
}
213+
}
214+
200215
#[cfg(test)]
201216
mod tests {
202217
use std::str::FromStr;

src/connector/src/parser/csv_parser.rs

Lines changed: 21 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -15,16 +15,15 @@
1515
use std::str::FromStr;
1616

1717
use anyhow::anyhow;
18-
use futures_async_stream::try_stream;
1918
use risingwave_common::cast::{str_to_date, str_to_timestamp, str_with_time_zone_to_timestamptz};
2019
use risingwave_common::error::ErrorCode::{InternalError, ProtocolError};
2120
use risingwave_common::error::{Result, RwError};
2221
use risingwave_common::types::{Datum, Decimal, ScalarImpl};
2322

24-
use crate::impl_common_parser_logic;
23+
use super::ByteStreamSourceParser;
2524
use crate::parser::{SourceStreamChunkRowWriter, WriteGuard};
26-
use crate::source::{DataType, SourceColumnDesc, SourceContextRef};
27-
impl_common_parser_logic!(CsvParser);
25+
use crate::source::{DataType, SourceColumnDesc, SourceContext, SourceContextRef};
26+
2827
macro_rules! to_rust_type {
2928
($v:ident, $t:ty) => {
3029
$v.parse::<$t>()
@@ -149,6 +148,24 @@ impl CsvParser {
149148
}
150149
}
151150

151+
impl ByteStreamSourceParser for CsvParser {
152+
fn columns(&self) -> &[SourceColumnDesc] {
153+
&self.rw_columns
154+
}
155+
156+
fn source_ctx(&self) -> &SourceContext {
157+
&self.source_ctx
158+
}
159+
160+
async fn parse_one<'a>(
161+
&'a mut self,
162+
payload: Vec<u8>,
163+
writer: SourceStreamChunkRowWriter<'a>,
164+
) -> Result<WriteGuard> {
165+
self.parse_inner(payload, writer).await
166+
}
167+
}
168+
152169
#[cfg(test)]
153170
mod tests {
154171
use risingwave_common::array::Op;

src/connector/src/parser/debezium/avro_parser.rs

Lines changed: 20 additions & 6 deletions
Original file line numberDiff line numberDiff line change
@@ -18,7 +18,6 @@ use std::sync::Arc;
1818

1919
use apache_avro::types::Value;
2020
use apache_avro::{from_avro_datum, Schema};
21-
use futures_async_stream::try_stream;
2221
use itertools::Itertools;
2322
use reqwest::Url;
2423
use risingwave_common::error::ErrorCode::{InternalError, ProtocolError};
@@ -27,24 +26,21 @@ use risingwave_pb::plan_common::ColumnDesc;
2726

2827
use super::operators::*;
2928
use crate::common::UpsertMessage;
30-
use crate::impl_common_parser_logic;
3129
use crate::parser::avro::schema_resolver::ConfluentSchemaResolver;
3230
use crate::parser::avro::util::{
3331
avro_field_to_column_desc, extract_inner_field_schema, from_avro_value,
3432
get_field_from_avro_value,
3533
};
3634
use crate::parser::schema_registry::{extract_schema_id, Client};
3735
use crate::parser::util::get_kafka_topic;
38-
use crate::parser::{SourceStreamChunkRowWriter, WriteGuard};
39-
use crate::source::{SourceColumnDesc, SourceContextRef};
36+
use crate::parser::{ByteStreamSourceParser, SourceStreamChunkRowWriter, WriteGuard};
37+
use crate::source::{SourceColumnDesc, SourceContext, SourceContextRef};
4038

4139
const BEFORE: &str = "before";
4240
const AFTER: &str = "after";
4341
const OP: &str = "op";
4442
const PAYLOAD: &str = "payload";
4543

46-
impl_common_parser_logic!(DebeziumAvroParser);
47-
4844
// TODO: avoid duplicated codes with `AvroParser`
4945
#[derive(Debug)]
5046
pub struct DebeziumAvroParser {
@@ -291,6 +287,24 @@ impl DebeziumAvroParser {
291287
}
292288
}
293289

290+
impl ByteStreamSourceParser for DebeziumAvroParser {
291+
fn columns(&self) -> &[SourceColumnDesc] {
292+
&self.rw_columns
293+
}
294+
295+
fn source_ctx(&self) -> &SourceContext {
296+
&self.source_ctx
297+
}
298+
299+
async fn parse_one<'a>(
300+
&'a mut self,
301+
payload: Vec<u8>,
302+
writer: SourceStreamChunkRowWriter<'a>,
303+
) -> Result<WriteGuard> {
304+
self.parse_inner(payload, writer).await
305+
}
306+
}
307+
294308
#[cfg(test)]
295309
mod tests {
296310
use std::io::Read;

src/connector/src/parser/debezium/mongo_json_parser.rs

Lines changed: 20 additions & 5 deletions
Original file line numberDiff line numberDiff line change
@@ -14,22 +14,19 @@
1414

1515
use std::fmt::Debug;
1616

17-
use futures_async_stream::try_stream;
1817
use risingwave_common::error::ErrorCode::ProtocolError;
1918
use risingwave_common::error::{Result, RwError};
2019
use risingwave_common::types::{DataType, Datum, ScalarImpl};
2120
use simd_json::{BorrowedValue, StaticNode, ValueAccess};
2221

2322
use super::operators::*;
24-
use crate::impl_common_parser_logic;
25-
use crate::parser::{SourceStreamChunkRowWriter, WriteGuard};
26-
use crate::source::{SourceColumnDesc, SourceContextRef};
23+
use crate::parser::{ByteStreamSourceParser, SourceStreamChunkRowWriter, WriteGuard};
24+
use crate::source::{SourceColumnDesc, SourceContext, SourceContextRef};
2725

2826
const BEFORE: &str = "before";
2927
const AFTER: &str = "after";
3028
const OP: &str = "op";
3129

32-
impl_common_parser_logic!(DebeziumMongoJsonParser);
3330
#[inline]
3431
fn ensure_not_null<'a, 'b: 'a>(value: &'a BorrowedValue<'b>) -> Option<&'a BorrowedValue<'b>> {
3532
if let BorrowedValue::Static(StaticNode::Null) = value {
@@ -272,6 +269,24 @@ impl DebeziumMongoJsonParser {
272269
}
273270
}
274271

272+
impl ByteStreamSourceParser for DebeziumMongoJsonParser {
273+
fn columns(&self) -> &[SourceColumnDesc] {
274+
&self.rw_columns
275+
}
276+
277+
fn source_ctx(&self) -> &SourceContext {
278+
&self.source_ctx
279+
}
280+
281+
async fn parse_one<'a>(
282+
&'a mut self,
283+
payload: Vec<u8>,
284+
writer: SourceStreamChunkRowWriter<'a>,
285+
) -> Result<WriteGuard> {
286+
self.parse_inner(payload, writer).await
287+
}
288+
}
289+
275290
#[cfg(test)]
276291
mod tests {
277292
use risingwave_common::array::Op;

0 commit comments

Comments
 (0)