Skip to content

Commit 19643ad

Browse files
authored
refactor(source): prefer SpecificParserConfig over SourceStruct (#12602)
1 parent 6833305 commit 19643ad

File tree

11 files changed

+132
-205
lines changed

11 files changed

+132
-205
lines changed

src/batch/src/executor/source.rs

Lines changed: 1 addition & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -30,7 +30,6 @@ use risingwave_connector::source::{
3030
};
3131
use risingwave_pb::batch_plan::plan_node::NodeBody;
3232
use risingwave_source::connector_source::ConnectorSource;
33-
use risingwave_source::source_desc::extract_source_struct;
3433

3534
use super::Executor;
3635
use crate::error::BatchError;
@@ -71,9 +70,7 @@ impl BoxedExecutorBuilder for SourceExecutor {
7170
.map_err(|e| RwError::from(ConnectorError(e.into())))?;
7271

7372
let info = source_node.get_info().unwrap();
74-
let source_struct = extract_source_struct(info)?;
75-
let parser_config =
76-
SpecificParserConfig::new(source_struct, info, &source_node.properties)?;
73+
let parser_config = SpecificParserConfig::new(info, &source_node.properties)?;
7774

7875
let columns: Vec<_> = source_node
7976
.columns

src/connector/src/parser/avro/parser.rs

Lines changed: 5 additions & 6 deletions
Original file line numberDiff line numberDiff line change
@@ -220,6 +220,7 @@ mod test {
220220
use risingwave_common::types::{DataType, Date, Interval, ScalarImpl, Timestamptz};
221221
use risingwave_common::{error, try_match_expand};
222222
use risingwave_pb::catalog::StreamSourceInfo;
223+
use risingwave_pb::plan_common::{PbEncodeType, PbFormatType};
223224
use url::Url;
224225

225226
use super::{
@@ -232,7 +233,7 @@ mod test {
232233
use crate::parser::{
233234
AccessBuilderImpl, EncodingType, SourceStreamChunkBuilder, SpecificParserConfig,
234235
};
235-
use crate::source::{SourceColumnDesc, SourceEncode, SourceFormat, SourceStruct};
236+
use crate::source::SourceColumnDesc;
236237

237238
fn test_data_path(file_name: &str) -> String {
238239
let curr_dir = env::current_dir().unwrap().into_os_string();
@@ -304,13 +305,11 @@ mod test {
304305
let info = StreamSourceInfo {
305306
row_schema_location: schema_path.clone(),
306307
use_schema_registry: false,
308+
format: PbFormatType::Plain.into(),
309+
row_encode: PbEncodeType::Avro.into(),
307310
..Default::default()
308311
};
309-
let parser_config = SpecificParserConfig::new(
310-
SourceStruct::new(SourceFormat::Plain, SourceEncode::Avro),
311-
&info,
312-
&HashMap::new(),
313-
)?;
312+
let parser_config = SpecificParserConfig::new(&info, &HashMap::new())?;
314313
AvroParserConfig::new(parser_config.encoding_config).await
315314
}
316315

src/connector/src/parser/debezium/avro_parser.rs

Lines changed: 5 additions & 6 deletions
Original file line numberDiff line numberDiff line change
@@ -151,12 +151,13 @@ mod tests {
151151
use risingwave_common::row::{OwnedRow, Row};
152152
use risingwave_common::types::{DataType, ScalarImpl};
153153
use risingwave_pb::catalog::StreamSourceInfo;
154+
use risingwave_pb::plan_common::{PbEncodeType, PbFormatType};
154155

155156
use super::*;
156157
use crate::parser::{
157158
DebeziumAvroParserConfig, DebeziumParser, SourceStreamChunkBuilder, SpecificParserConfig,
158159
};
159-
use crate::source::{SourceColumnDesc, SourceEncode, SourceFormat, SourceStruct};
160+
use crate::source::SourceColumnDesc;
160161

161162
const DEBEZIUM_AVRO_DATA: &[u8] = b"\x00\x00\x00\x00\x06\x00\x02\xd2\x0f\x0a\x53\x61\x6c\x6c\x79\x0c\x54\x68\x6f\x6d\x61\x73\x2a\x73\x61\x6c\x6c\x79\x2e\x74\x68\x6f\x6d\x61\x73\x40\x61\x63\x6d\x65\x2e\x63\x6f\x6d\x16\x32\x2e\x31\x2e\x32\x2e\x46\x69\x6e\x61\x6c\x0a\x6d\x79\x73\x71\x6c\x12\x64\x62\x73\x65\x72\x76\x65\x72\x31\xc0\xb4\xe8\xb7\xc9\x61\x00\x30\x66\x69\x72\x73\x74\x5f\x69\x6e\x5f\x64\x61\x74\x61\x5f\x63\x6f\x6c\x6c\x65\x63\x74\x69\x6f\x6e\x12\x69\x6e\x76\x65\x6e\x74\x6f\x72\x79\x00\x02\x12\x63\x75\x73\x74\x6f\x6d\x65\x72\x73\x00\x00\x20\x6d\x79\x73\x71\x6c\x2d\x62\x69\x6e\x2e\x30\x30\x30\x30\x30\x33\x8c\x06\x00\x00\x00\x02\x72\x02\x92\xc3\xe8\xb7\xc9\x61\x00";
162163

@@ -298,13 +299,11 @@ mod tests {
298299
));
299300
let info = StreamSourceInfo {
300301
row_schema_location: "http://127.0.0.1:8081".into(),
302+
format: PbFormatType::Debezium.into(),
303+
row_encode: PbEncodeType::Avro.into(),
301304
..Default::default()
302305
};
303-
let parser_config = SpecificParserConfig::new(
304-
SourceStruct::new(SourceFormat::Debezium, SourceEncode::Avro),
305-
&info,
306-
&props,
307-
)?;
306+
let parser_config = SpecificParserConfig::new(&info, &props)?;
308307
let config = DebeziumAvroParserConfig::new(parser_config.clone().encoding_config).await?;
309308
let columns = config
310309
.map_to_columns()?

src/connector/src/parser/mod.rs

Lines changed: 4 additions & 46 deletions
Original file line numberDiff line numberDiff line change
@@ -47,8 +47,8 @@ use self::util::get_kafka_topic;
4747
use crate::aws_auth::AwsAuthProps;
4848
use crate::parser::maxwell::MaxwellParser;
4949
use crate::source::{
50-
BoxSourceStream, SourceColumnDesc, SourceColumnType, SourceContext, SourceContextRef,
51-
SourceEncode, SourceFormat, SourceMeta, SourceStruct, SourceWithStateStream, SplitId,
50+
extract_source_struct, BoxSourceStream, SourceColumnDesc, SourceColumnType, SourceContext,
51+
SourceContextRef, SourceEncode, SourceFormat, SourceMeta, SourceWithStateStream, SplitId,
5252
StreamChunkWithState,
5353
};
5454

@@ -870,35 +870,9 @@ pub enum ProtocolProperties {
870870
}
871871

872872
impl SpecificParserConfig {
873-
pub fn get_source_struct(&self) -> SourceStruct {
874-
let format = match self.protocol_config {
875-
ProtocolProperties::Debezium => SourceFormat::Debezium,
876-
ProtocolProperties::DebeziumMongo => SourceFormat::DebeziumMongo,
877-
ProtocolProperties::Maxwell => SourceFormat::Maxwell,
878-
ProtocolProperties::Canal => SourceFormat::Canal,
879-
ProtocolProperties::Plain => SourceFormat::Plain,
880-
ProtocolProperties::Upsert => SourceFormat::Upsert,
881-
ProtocolProperties::Native => SourceFormat::Native,
882-
ProtocolProperties::Unspecified => unreachable!(),
883-
};
884-
let encode = match self.encoding_config {
885-
EncodingProperties::Avro(_) => SourceEncode::Avro,
886-
EncodingProperties::Protobuf(_) => SourceEncode::Protobuf,
887-
EncodingProperties::Csv(_) => SourceEncode::Csv,
888-
EncodingProperties::Json(_) => SourceEncode::Json,
889-
EncodingProperties::Bytes(_) => SourceEncode::Bytes,
890-
EncodingProperties::Native => SourceEncode::Native,
891-
EncodingProperties::Unspecified => unreachable!(),
892-
};
893-
SourceStruct { format, encode }
894-
}
895-
896873
// The validity of (format, encode) is ensured by `extract_format_encode`
897-
pub fn new(
898-
source_struct: SourceStruct,
899-
info: &StreamSourceInfo,
900-
props: &HashMap<String, String>,
901-
) -> Result<Self> {
874+
pub fn new(info: &StreamSourceInfo, props: &HashMap<String, String>) -> Result<Self> {
875+
let source_struct = extract_source_struct(info)?;
902876
let format = source_struct.format;
903877
let encode = source_struct.encode;
904878
// this transformation is needed since there may be config for the protocol
@@ -1026,19 +1000,3 @@ impl SpecificParserConfig {
10261000
})
10271001
}
10281002
}
1029-
1030-
impl ParserConfig {
1031-
pub fn new(
1032-
source_struct: SourceStruct,
1033-
info: &StreamSourceInfo,
1034-
props: &HashMap<String, String>,
1035-
rw_columns: &Vec<SourceColumnDesc>,
1036-
) -> Result<Self> {
1037-
let common = CommonParserConfig {
1038-
rw_columns: rw_columns.to_owned(),
1039-
};
1040-
let specific = SpecificParserConfig::new(source_struct, info, props)?;
1041-
1042-
Ok(Self { common, specific })
1043-
}
1044-
}

src/connector/src/parser/protobuf/parser.rs

Lines changed: 13 additions & 23 deletions
Original file line numberDiff line numberDiff line change
@@ -358,13 +358,13 @@ mod test {
358358
use risingwave_common::types::{DataType, StructType};
359359
use risingwave_pb::catalog::StreamSourceInfo;
360360
use risingwave_pb::data::data_type::PbTypeName;
361+
use risingwave_pb::plan_common::{PbEncodeType, PbFormatType};
361362

362363
use super::*;
363364
use crate::parser::protobuf::recursive::all_types::{EnumType, ExampleOneof, NestedMessage};
364365
use crate::parser::protobuf::recursive::AllTypes;
365366
use crate::parser::unified::Access;
366367
use crate::parser::SpecificParserConfig;
367-
use crate::source::{SourceEncode, SourceFormat, SourceStruct};
368368

369369
fn schema_dir() -> String {
370370
let dir = PathBuf::from("src/test_data");
@@ -391,13 +391,11 @@ mod test {
391391
proto_message_name: message_name.to_string(),
392392
row_schema_location: location.to_string(),
393393
use_schema_registry: false,
394+
format: PbFormatType::Plain.into(),
395+
row_encode: PbEncodeType::Protobuf.into(),
394396
..Default::default()
395397
};
396-
let parser_config = SpecificParserConfig::new(
397-
SourceStruct::new(SourceFormat::Plain, SourceEncode::Protobuf),
398-
&info,
399-
&HashMap::new(),
400-
)?;
398+
let parser_config = SpecificParserConfig::new(&info, &HashMap::new())?;
401399
let conf = ProtobufParserConfig::new(parser_config.encoding_config).await?;
402400
let value = DynamicMessage::decode(conf.message_descriptor, PRE_GEN_PROTO_DATA).unwrap();
403401

@@ -438,13 +436,11 @@ mod test {
438436
proto_message_name: message_name.to_string(),
439437
row_schema_location: location.to_string(),
440438
use_schema_registry: false,
439+
format: PbFormatType::Plain.into(),
440+
row_encode: PbEncodeType::Protobuf.into(),
441441
..Default::default()
442442
};
443-
let parser_config = SpecificParserConfig::new(
444-
SourceStruct::new(SourceFormat::Plain, SourceEncode::Protobuf),
445-
&info,
446-
&HashMap::new(),
447-
)?;
443+
let parser_config = SpecificParserConfig::new(&info, &HashMap::new())?;
448444
let conf = ProtobufParserConfig::new(parser_config.encoding_config).await?;
449445
let columns = conf.map_to_columns().unwrap();
450446

@@ -489,14 +485,11 @@ mod test {
489485
proto_message_name: message_name.to_string(),
490486
row_schema_location: location.to_string(),
491487
use_schema_registry: false,
488+
format: PbFormatType::Plain.into(),
489+
row_encode: PbEncodeType::Protobuf.into(),
492490
..Default::default()
493491
};
494-
let parser_config = SpecificParserConfig::new(
495-
SourceStruct::new(SourceFormat::Plain, SourceEncode::Protobuf),
496-
&info,
497-
&HashMap::new(),
498-
)
499-
.unwrap();
492+
let parser_config = SpecificParserConfig::new(&info, &HashMap::new()).unwrap();
500493
let conf = ProtobufParserConfig::new(parser_config.encoding_config)
501494
.await
502495
.unwrap();
@@ -518,14 +511,11 @@ mod test {
518511
proto_message_name: message_name.to_string(),
519512
row_schema_location: location.to_string(),
520513
use_schema_registry: false,
514+
format: PbFormatType::Plain.into(),
515+
row_encode: PbEncodeType::Protobuf.into(),
521516
..Default::default()
522517
};
523-
let parser_config = SpecificParserConfig::new(
524-
SourceStruct::new(SourceFormat::Plain, SourceEncode::Protobuf),
525-
&info,
526-
&HashMap::new(),
527-
)
528-
.unwrap();
518+
let parser_config = SpecificParserConfig::new(&info, &HashMap::new()).unwrap();
529519

530520
ProtobufParserConfig::new(parser_config.encoding_config)
531521
.await

src/connector/src/source/base.rs

Lines changed: 62 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -28,7 +28,7 @@ use risingwave_common::array::StreamChunk;
2828
use risingwave_common::catalog::TableId;
2929
use risingwave_common::error::{ErrorSuppressor, RwError};
3030
use risingwave_common::types::{JsonbVal, Scalar};
31-
use risingwave_pb::catalog::PbSource;
31+
use risingwave_pb::catalog::{PbSource, PbStreamSourceInfo};
3232
use risingwave_pb::source::ConnectorSplit;
3333
use risingwave_rpc_client::ConnectorClient;
3434
use serde::de::DeserializeOwned;
@@ -253,6 +253,67 @@ impl SourceStruct {
253253
}
254254
}
255255

256+
// Only return valid (format, encode)
257+
pub fn extract_source_struct(info: &PbStreamSourceInfo) -> Result<SourceStruct> {
258+
use risingwave_pb::plan_common::{PbEncodeType, PbFormatType, RowFormatType};
259+
260+
// old version meta.
261+
if let Ok(format) = info.get_row_format() {
262+
let (format, encode) = match format {
263+
RowFormatType::Json => (SourceFormat::Plain, SourceEncode::Json),
264+
RowFormatType::Protobuf => (SourceFormat::Plain, SourceEncode::Protobuf),
265+
RowFormatType::DebeziumJson => (SourceFormat::Debezium, SourceEncode::Json),
266+
RowFormatType::Avro => (SourceFormat::Plain, SourceEncode::Avro),
267+
RowFormatType::Maxwell => (SourceFormat::Maxwell, SourceEncode::Json),
268+
RowFormatType::CanalJson => (SourceFormat::Canal, SourceEncode::Json),
269+
RowFormatType::Csv => (SourceFormat::Plain, SourceEncode::Csv),
270+
RowFormatType::Native => (SourceFormat::Native, SourceEncode::Native),
271+
RowFormatType::DebeziumAvro => (SourceFormat::Debezium, SourceEncode::Avro),
272+
RowFormatType::UpsertJson => (SourceFormat::Upsert, SourceEncode::Json),
273+
RowFormatType::UpsertAvro => (SourceFormat::Upsert, SourceEncode::Avro),
274+
RowFormatType::DebeziumMongoJson => (SourceFormat::DebeziumMongo, SourceEncode::Json),
275+
RowFormatType::Bytes => (SourceFormat::Plain, SourceEncode::Bytes),
276+
RowFormatType::RowUnspecified => unreachable!(),
277+
};
278+
return Ok(SourceStruct::new(format, encode));
279+
}
280+
let source_format = info.get_format().map_err(|e| anyhow!("{e:?}"))?;
281+
let source_encode = info.get_row_encode().map_err(|e| anyhow!("{e:?}"))?;
282+
let (format, encode) = match (source_format, source_encode) {
283+
(PbFormatType::Plain, PbEncodeType::Json) => (SourceFormat::Plain, SourceEncode::Json),
284+
(PbFormatType::Plain, PbEncodeType::Protobuf) => {
285+
(SourceFormat::Plain, SourceEncode::Protobuf)
286+
}
287+
(PbFormatType::Debezium, PbEncodeType::Json) => {
288+
(SourceFormat::Debezium, SourceEncode::Json)
289+
}
290+
(PbFormatType::Plain, PbEncodeType::Avro) => (SourceFormat::Plain, SourceEncode::Avro),
291+
(PbFormatType::Maxwell, PbEncodeType::Json) => (SourceFormat::Maxwell, SourceEncode::Json),
292+
(PbFormatType::Canal, PbEncodeType::Json) => (SourceFormat::Canal, SourceEncode::Json),
293+
(PbFormatType::Plain, PbEncodeType::Csv) => (SourceFormat::Plain, SourceEncode::Csv),
294+
(PbFormatType::Native, PbEncodeType::Native) => {
295+
(SourceFormat::Native, SourceEncode::Native)
296+
}
297+
(PbFormatType::Debezium, PbEncodeType::Avro) => {
298+
(SourceFormat::Debezium, SourceEncode::Avro)
299+
}
300+
(PbFormatType::Upsert, PbEncodeType::Json) => (SourceFormat::Upsert, SourceEncode::Json),
301+
(PbFormatType::Upsert, PbEncodeType::Avro) => (SourceFormat::Upsert, SourceEncode::Avro),
302+
(PbFormatType::DebeziumMongo, PbEncodeType::Json) => {
303+
(SourceFormat::DebeziumMongo, SourceEncode::Json)
304+
}
305+
(PbFormatType::Plain, PbEncodeType::Bytes) => (SourceFormat::Plain, SourceEncode::Bytes),
306+
(format, encode) => {
307+
return Err(anyhow!(
308+
"Unsupported combination of format {:?} and encode {:?}",
309+
format,
310+
encode
311+
));
312+
}
313+
};
314+
Ok(SourceStruct::new(format, encode))
315+
}
316+
256317
pub type BoxSourceStream = BoxStream<'static, Result<Vec<SourceMessage>>>;
257318

258319
pub trait SourceWithStateStream =

src/connector/src/source/datagen/source/generator.rs

Lines changed: 17 additions & 11 deletions
Original file line numberDiff line numberDiff line change
@@ -23,10 +23,8 @@ use risingwave_common::row::OwnedRow;
2323
use risingwave_common::types::DataType;
2424
use risingwave_common::util::iter_util::ZipEqFast;
2525

26-
use crate::source::{
27-
SourceEncode, SourceFormat, SourceMessage, SourceMeta, SourceStruct, SplitId,
28-
StreamChunkWithState,
29-
};
26+
use crate::parser::{EncodingProperties, ProtocolProperties, SpecificParserConfig};
27+
use crate::source::{SourceMessage, SourceMeta, SplitId, StreamChunkWithState};
3028

3129
pub enum FieldDesc {
3230
// field is invisible, generate None
@@ -38,7 +36,7 @@ pub struct DatagenEventGenerator {
3836
// fields_map: HashMap<String, FieldGeneratorImpl>,
3937
field_names: Vec<String>,
4038
fields_vec: Vec<FieldDesc>,
41-
source_struct: SourceStruct,
39+
source_format: SpecificParserConfig,
4240
data_types: Vec<DataType>,
4341
offset: u64,
4442
split_id: SplitId,
@@ -56,7 +54,7 @@ impl DatagenEventGenerator {
5654
pub fn new(
5755
fields_vec: Vec<FieldDesc>,
5856
field_names: Vec<String>,
59-
source_struct: SourceStruct,
57+
source_format: SpecificParserConfig,
6058
data_types: Vec<DataType>,
6159
rows_per_second: u64,
6260
offset: u64,
@@ -72,7 +70,7 @@ impl DatagenEventGenerator {
7270
Ok(Self {
7371
field_names,
7472
fields_vec,
75-
source_struct,
73+
source_format,
7674
data_types,
7775
offset,
7876
split_id,
@@ -96,8 +94,11 @@ impl DatagenEventGenerator {
9694
);
9795
let mut msgs = Vec::with_capacity(num_rows_to_generate as usize);
9896
'outer: for _ in 0..num_rows_to_generate {
99-
let payload = match (self.source_struct.format, self.source_struct.encode) {
100-
(SourceFormat::Plain, SourceEncode::Json) => {
97+
let payload = match (
98+
&self.source_format.protocol_config,
99+
&self.source_format.encoding_config,
100+
) {
101+
(ProtocolProperties::Plain, EncodingProperties::Json(_)) => {
101102
let mut map = serde_json::Map::with_capacity(self.fields_vec.len());
102103
for (name, field_generator) in self
103104
.field_names
@@ -225,7 +226,6 @@ mod tests {
225226
use futures::stream::StreamExt;
226227

227228
use super::*;
228-
use crate::source::SourceEncode;
229229

230230
async fn check_sequence_partition_result(
231231
split_num: u64,
@@ -266,7 +266,13 @@ mod tests {
266266
let generator = DatagenEventGenerator::new(
267267
fields_vec,
268268
vec!["c1".to_owned(), "c2".to_owned()],
269-
SourceStruct::new(SourceFormat::Plain, SourceEncode::Json),
269+
SpecificParserConfig {
270+
protocol_config: ProtocolProperties::Plain,
271+
encoding_config: EncodingProperties::Json(crate::parser::JsonProperties {
272+
use_schema_registry: false,
273+
}),
274+
key_encoding_config: None,
275+
},
270276
data_types,
271277
rows_per_second,
272278
0,

0 commit comments

Comments
 (0)