Skip to content

Commit 3a0ee3d

Browse files
authored
perf(parser): do to_ascii_lowercase only once (risingwavelabs#8718)
1 parent 669087e commit 3a0ee3d

File tree

9 files changed

+48
-183
lines changed

9 files changed

+48
-183
lines changed

ci/workflows/pull-request.yml

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -312,7 +312,7 @@ steps:
312312
# files: "*-junit.xml"
313313
# format: "junit"
314314
- ./ci/plugins/upload-failure-logs
315-
timeout_in_minutes: 15
315+
timeout_in_minutes: 18
316316
retry: *auto-retry
317317

318318
- label: "misc check"

src/connector/src/parser/avro/parser.rs

Lines changed: 10 additions & 80 deletions
Original file line numberDiff line numberDiff line change
@@ -462,86 +462,16 @@ mod test {
462462

463463
fn build_rw_columns() -> Vec<SourceColumnDesc> {
464464
vec![
465-
SourceColumnDesc {
466-
name: "id".to_string(),
467-
data_type: DataType::Int32,
468-
column_id: ColumnId::from(0),
469-
is_row_id: false,
470-
is_meta: false,
471-
fields: vec![],
472-
},
473-
SourceColumnDesc {
474-
name: "sequence_id".to_string(),
475-
data_type: DataType::Int64,
476-
column_id: ColumnId::from(1),
477-
is_row_id: false,
478-
is_meta: false,
479-
fields: vec![],
480-
},
481-
SourceColumnDesc {
482-
name: "name".to_string(),
483-
data_type: DataType::Varchar,
484-
column_id: ColumnId::from(2),
485-
is_row_id: false,
486-
is_meta: false,
487-
fields: vec![],
488-
},
489-
SourceColumnDesc {
490-
name: "score".to_string(),
491-
data_type: DataType::Float32,
492-
column_id: ColumnId::from(3),
493-
is_row_id: false,
494-
is_meta: false,
495-
fields: vec![],
496-
},
497-
SourceColumnDesc {
498-
name: "avg_score".to_string(),
499-
data_type: DataType::Float64,
500-
column_id: ColumnId::from(4),
501-
is_row_id: false,
502-
is_meta: false,
503-
fields: vec![],
504-
},
505-
SourceColumnDesc {
506-
name: "is_lasted".to_string(),
507-
data_type: DataType::Boolean,
508-
column_id: ColumnId::from(5),
509-
is_row_id: false,
510-
is_meta: false,
511-
fields: vec![],
512-
},
513-
SourceColumnDesc {
514-
name: "entrance_date".to_string(),
515-
data_type: DataType::Date,
516-
column_id: ColumnId::from(6),
517-
is_row_id: false,
518-
is_meta: false,
519-
fields: vec![],
520-
},
521-
SourceColumnDesc {
522-
name: "birthday".to_string(),
523-
data_type: DataType::Timestamp,
524-
column_id: ColumnId::from(7),
525-
is_row_id: false,
526-
is_meta: false,
527-
fields: vec![],
528-
},
529-
SourceColumnDesc {
530-
name: "anniversary".to_string(),
531-
data_type: DataType::Timestamp,
532-
column_id: ColumnId::from(8),
533-
is_row_id: false,
534-
is_meta: false,
535-
fields: vec![],
536-
},
537-
SourceColumnDesc {
538-
name: "passed".to_string(),
539-
data_type: DataType::Interval,
540-
column_id: ColumnId::from(9),
541-
is_row_id: false,
542-
is_meta: false,
543-
fields: vec![],
544-
},
465+
SourceColumnDesc::simple("id", DataType::Int32, ColumnId::from(0)),
466+
SourceColumnDesc::simple("sequence_id", DataType::Int64, ColumnId::from(1)),
467+
SourceColumnDesc::simple("name", DataType::Varchar, ColumnId::from(2)),
468+
SourceColumnDesc::simple("score", DataType::Float32, ColumnId::from(3)),
469+
SourceColumnDesc::simple("avg_score", DataType::Float64, ColumnId::from(4)),
470+
SourceColumnDesc::simple("is_lasted", DataType::Boolean, ColumnId::from(5)),
471+
SourceColumnDesc::simple("entrance_date", DataType::Date, ColumnId::from(6)),
472+
SourceColumnDesc::simple("birthday", DataType::Timestamp, ColumnId::from(7)),
473+
SourceColumnDesc::simple("anniversary", DataType::Timestamp, ColumnId::from(8)),
474+
SourceColumnDesc::simple("passed", DataType::Interval, ColumnId::from(9)),
545475
]
546476
}
547477

src/connector/src/parser/canal/simd_json_parser.rs

Lines changed: 6 additions & 7 deletions
Original file line numberDiff line numberDiff line change
@@ -96,7 +96,7 @@ impl CanalJsonParser {
9696
writer.insert(|column| {
9797
cannal_simd_json_parse_value(
9898
&column.data_type,
99-
v.get(column.name.to_ascii_lowercase().as_str()),
99+
v.get(column.name_in_lower_case.as_str()),
100100
)
101101
})
102102
})
@@ -135,15 +135,14 @@ impl CanalJsonParser {
135135
// in origin canal, old only contains the changed columns but data
136136
// contains all columns.
137137
// in ticdc, old contains all fields
138-
let col_name_lc = column.name.to_ascii_lowercase();
139-
let before_value = before
140-
.get(col_name_lc.as_str())
141-
.or_else(|| after.get(col_name_lc.as_str()));
138+
let col_name_lc = column.name_in_lower_case.as_str();
139+
let before_value =
140+
before.get(col_name_lc).or_else(|| after.get(col_name_lc));
142141
let before =
143142
cannal_simd_json_parse_value(&column.data_type, before_value)?;
144143
let after = cannal_simd_json_parse_value(
145144
&column.data_type,
146-
after.get(col_name_lc.as_str()),
145+
after.get(col_name_lc),
147146
)?;
148147
Ok((before, after))
149148
})
@@ -169,7 +168,7 @@ impl CanalJsonParser {
169168
writer.delete(|column| {
170169
cannal_simd_json_parse_value(
171170
&column.data_type,
172-
v.get(column.name.to_ascii_lowercase().as_str()),
171+
v.get(column.name_in_lower_case.as_str()),
173172
)
174173
})
175174
})

src/connector/src/parser/debezium/simd_json_parser.rs

Lines changed: 4 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -97,11 +97,11 @@ impl DebeziumJsonParser {
9797
writer.update(|column| {
9898
let before = simd_json_parse_value(
9999
&column.data_type,
100-
before.get(column.name.to_ascii_lowercase().as_str()),
100+
before.get(column.name_in_lower_case.as_str()),
101101
)?;
102102
let after = simd_json_parse_value(
103103
&column.data_type,
104-
after.get(column.name.to_ascii_lowercase().as_str()),
104+
after.get(column.name_in_lower_case.as_str()),
105105
)?;
106106

107107
Ok((before, after))
@@ -120,7 +120,7 @@ impl DebeziumJsonParser {
120120
writer.insert(|column| {
121121
simd_json_parse_value(
122122
&column.data_type,
123-
after.get(column.name.to_ascii_lowercase().as_str()),
123+
after.get(column.name_in_lower_case.as_str()),
124124
)
125125
.map_err(Into::into)
126126
})
@@ -138,7 +138,7 @@ impl DebeziumJsonParser {
138138
writer.delete(|column| {
139139
simd_json_parse_value(
140140
&column.data_type,
141-
before.get(column.name.to_ascii_lowercase().as_str()),
141+
before.get(column.name_in_lower_case.as_str()),
142142
)
143143
.map_err(Into::into)
144144
})

src/connector/src/parser/json_parser.rs

Lines changed: 6 additions & 9 deletions
Original file line numberDiff line numberDiff line change
@@ -69,14 +69,11 @@ impl JsonParser {
6969
writer: &mut SourceStreamChunkRowWriter<'_>,
7070
) -> Result<WriteGuard> {
7171
writer.insert(|desc| {
72-
simd_json_parse_value(
73-
&desc.data_type,
74-
value.get(desc.name.to_ascii_lowercase().as_str()),
75-
)
76-
.map_err(|e| {
77-
tracing::error!("failed to process value ({}): {}", value, e);
78-
e.into()
79-
})
72+
simd_json_parse_value(&desc.data_type, value.get(desc.name_in_lower_case.as_str()))
73+
.map_err(|e| {
74+
tracing::error!("failed to process value ({}): {}", value, e);
75+
e.into()
76+
})
8077
})
8178
}
8279

@@ -119,7 +116,7 @@ impl JsonParser {
119116
let fill_fn = |desc: &SourceColumnDesc| {
120117
simd_json_parse_value(
121118
&desc.data_type,
122-
value.get(desc.name.to_ascii_lowercase().as_str()),
119+
value.get(desc.name_in_lower_case.as_str()),
123120
)
124121
.map_err(|e| {
125122
tracing::error!(

src/connector/src/parser/maxwell/simd_json_parser.rs

Lines changed: 5 additions & 8 deletions
Original file line numberDiff line numberDiff line change
@@ -71,7 +71,7 @@ impl MaxwellParser {
7171
writer.insert(|column| {
7272
simd_json_parse_value(
7373
&column.data_type,
74-
after.get(column.name.to_ascii_lowercase().as_str()),
74+
after.get(column.name_in_lower_case.as_str()),
7575
)
7676
.map_err(Into::into)
7777
})
@@ -90,13 +90,10 @@ impl MaxwellParser {
9090

9191
writer.update(|column| {
9292
// old only contains the changed columns but data contains all columns.
93-
let col_name_lc = column.name.to_ascii_lowercase();
94-
let before_value = before
95-
.get(col_name_lc.as_str())
96-
.or_else(|| after.get(col_name_lc.as_str()));
93+
let col_name_lc = column.name_in_lower_case.as_str();
94+
let before_value = before.get(col_name_lc).or_else(|| after.get(col_name_lc));
9795
let before = simd_json_parse_value(&column.data_type, before_value)?;
98-
let after =
99-
simd_json_parse_value(&column.data_type, after.get(col_name_lc.as_str()))?;
96+
let after = simd_json_parse_value(&column.data_type, after.get(col_name_lc))?;
10097
Ok((before, after))
10198
})
10299
}
@@ -107,7 +104,7 @@ impl MaxwellParser {
107104
writer.delete(|column| {
108105
simd_json_parse_value(
109106
&column.data_type,
110-
before.get(column.name.to_ascii_lowercase().as_str()),
107+
before.get(column.name_in_lower_case.as_str()),
111108
)
112109
.map_err(Into::into)
113110
})

src/connector/src/parser/mod.rs

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -197,7 +197,7 @@ impl SourceStreamChunkRowWriter<'_> {
197197
&mut self,
198198
mut f: impl FnMut(&SourceColumnDesc) -> Result<A::Output>,
199199
) -> Result<WriteGuard> {
200-
let mut modify_col = vec![];
200+
let mut modify_col = Vec::with_capacity(self.descs.len());
201201
self.descs
202202
.iter()
203203
.zip_eq(self.builders.iter_mut())

src/connector/src/source/manager.rs

Lines changed: 6 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -22,6 +22,7 @@ use risingwave_common::types::DataType;
2222
#[derive(Clone, Debug)]
2323
pub struct SourceColumnDesc {
2424
pub name: String,
25+
pub name_in_lower_case: String,
2526
pub data_type: DataType,
2627
pub column_id: ColumnId,
2728
pub fields: Vec<ColumnDesc>,
@@ -39,8 +40,11 @@ impl SourceColumnDesc {
3940
!matches!(data_type, DataType::List { .. } | DataType::Struct(..)),
4041
"called `SourceColumnDesc::simple` with a composite type."
4142
);
43+
let name = name.into();
44+
let name_in_lower_case = name.to_ascii_lowercase();
4245
Self {
43-
name: name.into(),
46+
name,
47+
name_in_lower_case,
4448
data_type,
4549
column_id,
4650
fields: vec![],
@@ -60,6 +64,7 @@ impl From<&ColumnDesc> for SourceColumnDesc {
6064
let is_meta = c.name.starts_with("_rw_kafka_timestamp");
6165
Self {
6266
name: c.name.clone(),
67+
name_in_lower_case: c.name.to_ascii_lowercase(),
6368
data_type: c.data_type.clone(),
6469
column_id: c.column_id,
6570
fields: c.field_descs.clone(),

src/source/benches/json_parser.rs

Lines changed: 9 additions & 72 deletions
Original file line numberDiff line numberDiff line change
@@ -53,78 +53,15 @@ fn generate_all_json() -> Vec<Vec<u8>> {
5353

5454
fn get_descs() -> Vec<SourceColumnDesc> {
5555
vec![
56-
SourceColumnDesc {
57-
name: "i32".to_string(),
58-
data_type: DataType::Int32,
59-
column_id: ColumnId::from(0),
60-
is_row_id: false,
61-
is_meta: false,
62-
fields: vec![],
63-
},
64-
SourceColumnDesc {
65-
name: "bool".to_string(),
66-
data_type: DataType::Boolean,
67-
column_id: ColumnId::from(2),
68-
is_row_id: false,
69-
is_meta: false,
70-
fields: vec![],
71-
},
72-
SourceColumnDesc {
73-
name: "i16".to_string(),
74-
data_type: DataType::Int16,
75-
column_id: ColumnId::from(3),
76-
is_row_id: false,
77-
is_meta: false,
78-
fields: vec![],
79-
},
80-
SourceColumnDesc {
81-
name: "i64".to_string(),
82-
data_type: DataType::Int64,
83-
column_id: ColumnId::from(4),
84-
is_row_id: false,
85-
is_meta: false,
86-
fields: vec![],
87-
},
88-
SourceColumnDesc {
89-
name: "f32".to_string(),
90-
data_type: DataType::Float32,
91-
column_id: ColumnId::from(5),
92-
is_row_id: false,
93-
is_meta: false,
94-
fields: vec![],
95-
},
96-
SourceColumnDesc {
97-
name: "f64".to_string(),
98-
data_type: DataType::Float64,
99-
column_id: ColumnId::from(6),
100-
is_row_id: false,
101-
is_meta: false,
102-
fields: vec![],
103-
},
104-
SourceColumnDesc {
105-
name: "varchar".to_string(),
106-
data_type: DataType::Varchar,
107-
column_id: ColumnId::from(7),
108-
is_row_id: false,
109-
is_meta: false,
110-
fields: vec![],
111-
},
112-
SourceColumnDesc {
113-
name: "date".to_string(),
114-
data_type: DataType::Date,
115-
column_id: ColumnId::from(8),
116-
is_row_id: false,
117-
is_meta: false,
118-
fields: vec![],
119-
},
120-
SourceColumnDesc {
121-
name: "timestamp".to_string(),
122-
data_type: DataType::Timestamp,
123-
column_id: ColumnId::from(9),
124-
is_row_id: false,
125-
is_meta: false,
126-
fields: vec![],
127-
},
56+
SourceColumnDesc::simple("i32", DataType::Int32, ColumnId::from(0)),
57+
SourceColumnDesc::simple("bool", DataType::Boolean, ColumnId::from(2)),
58+
SourceColumnDesc::simple("i16", DataType::Int16, ColumnId::from(3)),
59+
SourceColumnDesc::simple("i64", DataType::Int64, ColumnId::from(4)),
60+
SourceColumnDesc::simple("f32", DataType::Float32, ColumnId::from(5)),
61+
SourceColumnDesc::simple("f64", DataType::Float64, ColumnId::from(6)),
62+
SourceColumnDesc::simple("varchar", DataType::Varchar, ColumnId::from(7)),
63+
SourceColumnDesc::simple("date", DataType::Date, ColumnId::from(8)),
64+
SourceColumnDesc::simple("timestamp", DataType::Timestamp, ColumnId::from(9)),
12865
]
12966
}
13067

0 commit comments

Comments
 (0)