Skip to content

Commit f6ccfd5

Browse files
authored
perf(connector): use Vec<u8> instead of Bytes and &[u8] (risingwavelabs#8732)
1 parent 0e8d81f commit f6ccfd5

File tree

20 files changed

+145
-113
lines changed

20 files changed

+145
-113
lines changed

src/connector/src/macros.rs

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -200,7 +200,7 @@ macro_rules! impl_common_parser_logic {
200200

201201
let old_op_num = builder.op_num();
202202

203-
if let Err(e) = self.parse_inner(content.as_ref(), builder.row_writer())
203+
if let Err(e) = self.parse_inner(content, builder.row_writer())
204204
.await
205205
{
206206
tracing::warn!("message parsing failed {}, skipping", e.to_string());

src/connector/src/parser/avro/parser.rs

Lines changed: 8 additions & 10 deletions
Original file line numberDiff line numberDiff line change
@@ -12,6 +12,7 @@
1212
// See the License for the specific language governing permissions and
1313
// limitations under the License.
1414

15+
use std::borrow::Cow;
1516
use std::collections::HashMap;
1617
use std::fmt::Debug;
1718
use std::sync::Arc;
@@ -175,7 +176,7 @@ impl AvroParser {
175176

176177
pub(crate) async fn parse_inner(
177178
&self,
178-
payload: &[u8],
179+
payload: Vec<u8>,
179180
mut writer: SourceStreamChunkRowWriter<'_>,
180181
) -> Result<WriteGuard> {
181182
enum Op {
@@ -184,7 +185,7 @@ impl AvroParser {
184185
}
185186

186187
let (_payload, op) = if self.is_enable_upsert() {
187-
let msg: UpsertMessage<'_> = bincode::deserialize(payload).map_err(|e| {
188+
let msg: UpsertMessage<'_> = bincode::deserialize(&payload).map_err(|e| {
188189
RwError::from(ProtocolError(format!(
189190
"extract payload err {:?}, you may need to check the 'upsert' parameter",
190191
e
@@ -196,7 +197,7 @@ impl AvroParser {
196197
(msg.primary_key, Op::Delete)
197198
}
198199
} else {
199-
(payload.into(), Op::Insert)
200+
(Cow::from(&payload), Op::Insert)
200201
};
201202

202203
// parse payload to avro value
@@ -212,7 +213,7 @@ impl AvroParser {
212213
from_avro_datum(writer_schema.as_ref(), &mut raw_payload, reader_schema)
213214
.map_err(|e| RwError::from(ProtocolError(e.to_string())))?
214215
} else {
215-
let mut reader = Reader::with_schema(&self.schema, payload as &[u8])
216+
let mut reader = Reader::with_schema(&self.schema, &payload as &[u8])
216217
.map_err(|e| RwError::from(ProtocolError(e.to_string())))?;
217218
match reader.next() {
218219
Some(Ok(v)) => v,
@@ -237,7 +238,7 @@ impl AvroParser {
237238
from_avro_value(tuple.1.clone(), field_schema).map_err(|e| {
238239
tracing::error!(
239240
"failed to process value ({}): {}",
240-
String::from_utf8_lossy(payload),
241+
String::from_utf8_lossy(&payload),
241242
e
242243
);
243244
e
@@ -271,7 +272,7 @@ impl AvroParser {
271272
.map_err(|e| {
272273
tracing::error!(
273274
"failed to process value ({}): {}",
274-
String::from_utf8_lossy(payload),
275+
String::from_utf8_lossy(&payload),
275276
e
276277
);
277278
e
@@ -388,10 +389,7 @@ mod test {
388389
let mut builder = SourceStreamChunkBuilder::with_capacity(columns, 1);
389390
{
390391
let writer = builder.row_writer();
391-
avro_parser
392-
.parse_inner(&input_data[..], writer)
393-
.await
394-
.unwrap();
392+
avro_parser.parse_inner(input_data, writer).await.unwrap();
395393
}
396394
let chunk = builder.finish();
397395
let (op, row) = chunk.rows().next().unwrap();

src/connector/src/parser/canal/simd_json_parser.rs

Lines changed: 4 additions & 5 deletions
Original file line numberDiff line numberDiff line change
@@ -54,11 +54,10 @@ impl CanalJsonParser {
5454
#[allow(clippy::unused_async)]
5555
pub async fn parse_inner(
5656
&self,
57-
payload: &[u8],
57+
mut payload: Vec<u8>,
5858
mut writer: SourceStreamChunkRowWriter<'_>,
5959
) -> Result<WriteGuard> {
60-
let mut payload_mut = payload.to_vec();
61-
let event: BorrowedValue<'_> = simd_json::to_borrowed_value(&mut payload_mut)
60+
let event: BorrowedValue<'_> = simd_json::to_borrowed_value(&mut payload)
6261
.map_err(|e| RwError::from(ProtocolError(e.to_string())))?;
6362

6463
let is_ddl = event.get(IS_DDL).and_then(|v| v.as_bool()).ok_or_else(|| {
@@ -263,7 +262,7 @@ mod tests {
263262
let mut builder = SourceStreamChunkBuilder::with_capacity(descs, 2);
264263

265264
let writer = builder.row_writer();
266-
parser.parse_inner(payload, writer).await.unwrap();
265+
parser.parse_inner(payload.to_vec(), writer).await.unwrap();
267266

268267
let chunk = builder.finish();
269268

@@ -340,7 +339,7 @@ mod tests {
340339
let mut builder = SourceStreamChunkBuilder::with_capacity(descs, 2);
341340

342341
let writer = builder.row_writer();
343-
parser.parse_inner(payload, writer).await.unwrap();
342+
parser.parse_inner(payload.to_vec(), writer).await.unwrap();
344343

345344
let chunk = builder.finish();
346345

src/connector/src/parser/csv_parser.rs

Lines changed: 5 additions & 5 deletions
Original file line numberDiff line numberDiff line change
@@ -113,10 +113,10 @@ impl CsvParser {
113113
#[allow(clippy::unused_async)]
114114
pub async fn parse_inner(
115115
&mut self,
116-
payload: &[u8],
116+
payload: Vec<u8>,
117117
mut writer: SourceStreamChunkRowWriter<'_>,
118118
) -> Result<WriteGuard> {
119-
let mut fields = self.read_row(payload)?;
119+
let mut fields = self.read_row(&payload)?;
120120
if let Some(headers) = &mut self.headers {
121121
if headers.is_empty() {
122122
*headers = fields;
@@ -161,7 +161,7 @@ mod tests {
161161
use crate::parser::SourceStreamChunkBuilder;
162162
#[tokio::test]
163163
async fn test_csv_without_headers() {
164-
let data = [
164+
let data = vec![
165165
r#"1,a,2"#,
166166
r#""15541","a,1,1,",4"#,
167167
r#"0,"""0",0"#,
@@ -185,7 +185,7 @@ mod tests {
185185
let mut builder = SourceStreamChunkBuilder::with_capacity(descs, 4);
186186
for item in data {
187187
parser
188-
.parse_inner(item.as_bytes(), builder.row_writer())
188+
.parse_inner(item.as_bytes().to_vec(), builder.row_writer())
189189
.await
190190
.unwrap();
191191
}
@@ -292,7 +292,7 @@ mod tests {
292292
let mut builder = SourceStreamChunkBuilder::with_capacity(descs, 4);
293293
for item in data {
294294
let _ = parser
295-
.parse_inner(item.as_bytes(), builder.row_writer())
295+
.parse_inner(item.as_bytes().to_vec(), builder.row_writer())
296296
.await;
297297
}
298298
let chunk = builder.finish();

src/connector/src/parser/debezium/avro_parser.rs

Lines changed: 4 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -189,10 +189,10 @@ impl DebeziumAvroParser {
189189

190190
pub(crate) async fn parse_inner(
191191
&self,
192-
payload: &[u8],
192+
payload: Vec<u8>,
193193
mut writer: SourceStreamChunkRowWriter<'_>,
194194
) -> Result<WriteGuard> {
195-
let (schema_id, mut raw_payload) = extract_schema_id(payload)?;
195+
let (schema_id, mut raw_payload) = extract_schema_id(&payload)?;
196196
let writer_schema = self.schema_resolver.get(schema_id).await?;
197197

198198
let avro_value = from_avro_datum(writer_schema.as_ref(), &mut raw_payload, None)
@@ -296,7 +296,7 @@ mod tests {
296296
async fn parse_one(
297297
parser: DebeziumAvroParser,
298298
columns: Vec<SourceColumnDesc>,
299-
payload: &[u8],
299+
payload: Vec<u8>,
300300
) -> Vec<(Op, OwnedRow)> {
301301
let mut builder = SourceStreamChunkBuilder::with_capacity(columns, 2);
302302
{
@@ -442,7 +442,7 @@ mod tests {
442442

443443
let parser =
444444
DebeziumAvroParser::new(columns.clone(), config, Arc::new(Default::default()))?;
445-
let [(op, row)]: [_; 1] = parse_one(parser, columns, DEBEZIUM_AVRO_DATA)
445+
let [(op, row)]: [_; 1] = parse_one(parser, columns, DEBEZIUM_AVRO_DATA.to_vec())
446446
.await
447447
.try_into()
448448
.unwrap();

0 commit comments

Comments
 (0)