Skip to content

Commit 2f18c5c

Browse files
committed
add default plain json config
1 parent 088fc33 commit 2f18c5c

File tree

2 files changed

+35
-14
lines changed

2 files changed

+35
-14
lines changed

src/connector/src/parser/json_parser.rs

Lines changed: 24 additions & 14 deletions
Original file line numberDiff line numberDiff line change
@@ -232,16 +232,6 @@ mod tests {
232232
]
233233
}
234234

235-
fn get_json_config() -> SpecificParserConfig {
236-
SpecificParserConfig {
237-
key_encoding_config: None,
238-
encoding_config: EncodingProperties::Json(JsonProperties {
239-
use_schema_registry: false,
240-
}),
241-
protocol_config: ProtocolProperties::Plain,
242-
}
243-
}
244-
245235
async fn test_json_parser(get_payload: fn() -> Vec<Vec<u8>>) {
246236
let descs = vec![
247237
SourceColumnDesc::simple("i32", DataType::Int32, 0.into()),
@@ -256,7 +246,12 @@ mod tests {
256246
SourceColumnDesc::simple("decimal", DataType::Decimal, 10.into()),
257247
];
258248

259-
let parser = JsonParser::new(get_json_config(), descs.clone(), Default::default()).unwrap();
249+
let parser = JsonParser::new(
250+
SpecificParserConfig::DEFAULT_PLAIN_JSON,
251+
descs.clone(),
252+
Default::default(),
253+
)
254+
.unwrap();
260255

261256
let mut builder = SourceStreamChunkBuilder::with_capacity(descs, 2);
262257

@@ -355,7 +350,12 @@ mod tests {
355350
SourceColumnDesc::simple("v2", DataType::Int16, 1.into()),
356351
SourceColumnDesc::simple("v3", DataType::Varchar, 2.into()),
357352
];
358-
let parser = JsonParser::new(get_json_config(), descs.clone(), Default::default()).unwrap();
353+
let parser = JsonParser::new(
354+
SpecificParserConfig::DEFAULT_PLAIN_JSON,
355+
descs.clone(),
356+
Default::default(),
357+
)
358+
.unwrap();
359359
let mut builder = SourceStreamChunkBuilder::with_capacity(descs, 3);
360360

361361
// Parse a correct record.
@@ -421,7 +421,12 @@ mod tests {
421421
.map(SourceColumnDesc::from)
422422
.collect_vec();
423423

424-
let parser = JsonParser::new(get_json_config(), descs.clone(), Default::default()).unwrap();
424+
let parser = JsonParser::new(
425+
SpecificParserConfig::DEFAULT_PLAIN_JSON,
426+
descs.clone(),
427+
Default::default(),
428+
)
429+
.unwrap();
425430
let payload = br#"
426431
{
427432
"data": {
@@ -488,7 +493,12 @@ mod tests {
488493
.map(SourceColumnDesc::from)
489494
.collect_vec();
490495

491-
let parser = JsonParser::new(get_json_config(), descs.clone(), Default::default()).unwrap();
496+
let parser = JsonParser::new(
497+
SpecificParserConfig::DEFAULT_PLAIN_JSON,
498+
descs.clone(),
499+
Default::default(),
500+
)
501+
.unwrap();
492502
let payload = br#"
493503
{
494504
"struct": "{\"varchar\": \"varchar\", \"boolean\": true}"

src/connector/src/parser/mod.rs

Lines changed: 11 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -678,6 +678,17 @@ pub struct SpecificParserConfig {
678678
pub protocol_config: ProtocolProperties,
679679
}
680680

681+
impl SpecificParserConfig {
682+
// for test only
683+
pub const DEFAULT_PLAIN_JSON: SpecificParserConfig = SpecificParserConfig {
684+
key_encoding_config: None,
685+
encoding_config: EncodingProperties::Json(JsonProperties {
686+
use_schema_registry: false,
687+
}),
688+
protocol_config: ProtocolProperties::Plain,
689+
};
690+
}
691+
681692
#[derive(Debug, Clone, Default)]
682693
pub struct SchemaRegistryAuth {
683694
username: Option<String>,

0 commit comments

Comments
 (0)