Skip to content

Commit adb8651

Browse files
authored
refactor(source): rename some source parser related types and methods (#19863)
Signed-off-by: Richard Chien <[email protected]>
1 parent f3d8e0d commit adb8651

File tree

28 files changed

+113
-100
lines changed

28 files changed

+113
-100
lines changed

src/connector/benches/json_parser_case_insensitive.rs

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -89,7 +89,7 @@ fn create_parser(chunk_size: usize, chunk_num: usize, mode: &str) -> (Parser, In
8989

9090
async fn parse(parser: Parser, input: Input) {
9191
parser
92-
.into_stream(futures::stream::iter(input.into_iter().map(Ok)).boxed())
92+
.parse_stream(futures::stream::iter(input.into_iter().map(Ok)).boxed())
9393
.count() // consume the stream
9494
.await;
9595
}

src/connector/benches/nexmark_integration.rs

Lines changed: 4 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -30,7 +30,7 @@ use risingwave_connector::parser::{
3030
ByteStreamSourceParserImpl, CommonParserConfig, ParserConfig, SpecificParserConfig,
3131
};
3232
use risingwave_connector::source::{
33-
BoxChunkSourceStream, BoxSourceStream, SourceColumnDesc, SourceMessage, SourceMeta,
33+
BoxSourceChunkStream, BoxSourceMessageStream, SourceColumnDesc, SourceMessage, SourceMeta,
3434
};
3535
use tracing::Level;
3636
use tracing_subscriber::prelude::*;
@@ -72,7 +72,7 @@ fn make_batch(use_struct: bool) -> Vec<SourceMessage> {
7272
.collect_vec()
7373
}
7474

75-
fn make_data_stream(use_struct: bool) -> BoxSourceStream {
75+
fn make_data_stream(use_struct: bool) -> BoxSourceMessageStream {
7676
futures::future::ready(Ok(if use_struct {
7777
STRUCT_BATCH.clone()
7878
} else {
@@ -118,8 +118,8 @@ fn make_parser(use_struct: bool) -> ByteStreamSourceParserImpl {
118118
}
119119

120120
fn make_stream_iter(use_struct: bool) -> impl Iterator<Item = StreamChunk> {
121-
let mut stream: BoxChunkSourceStream = make_parser(use_struct)
122-
.into_stream(make_data_stream(use_struct))
121+
let mut stream: BoxSourceChunkStream = make_parser(use_struct)
122+
.parse_stream(make_data_stream(use_struct))
123123
.boxed();
124124

125125
std::iter::from_fn(move || {

src/connector/src/parser/chunk_builder.rs

Lines changed: 14 additions & 6 deletions
Original file line numberDiff line numberDiff line change
@@ -43,6 +43,12 @@ struct Transaction {
4343
}
4444

4545
/// A builder for building a [`StreamChunk`] from [`SourceColumnDesc`].
46+
///
47+
/// Output chunk size is controlled by `source_ctrl_opts.chunk_size` and `source_ctrl_opts.split_txn`.
48+
/// During building process, it's possible that multiple chunks are built even without any explicit
49+
/// call to `finish_current_chunk`. This mainly happens when we find more than one records in one
50+
/// `SourceMessage` when parsing it. User of this builder should call `consume_ready_chunks` to consume
51+
/// the built chunks from time to time, to avoid the buffer from growing too large.
4652
pub struct SourceStreamChunkBuilder {
4753
column_descs: Vec<SourceColumnDesc>,
4854
source_ctrl_opts: SourceCtrlOpts,
@@ -299,7 +305,7 @@ impl SourceStreamChunkRowWriter<'_> {
299305
}
300306
(&SourceColumnType::Meta, _)
301307
if matches!(
302-
&self.row_meta.map(|ele| ele.meta),
308+
&self.row_meta.map(|ele| ele.source_meta),
303309
&Some(SourceMeta::Kafka(_) | SourceMeta::DebeziumCdc(_))
304310
) =>
305311
{
@@ -318,7 +324,7 @@ impl SourceStreamChunkRowWriter<'_> {
318324
) => {
319325
match self.row_meta {
320326
Some(row_meta) => {
321-
if let SourceMeta::DebeziumCdc(cdc_meta) = row_meta.meta {
327+
if let SourceMeta::DebeziumCdc(cdc_meta) = row_meta.source_meta {
322328
Ok(A::output_for(extract_cdc_meta_column(
323329
cdc_meta,
324330
col,
@@ -334,7 +340,9 @@ impl SourceStreamChunkRowWriter<'_> {
334340
}
335341
}
336342
(_, &Some(AdditionalColumnType::Timestamp(_))) => match self.row_meta {
337-
Some(row_meta) => Ok(A::output_for(extract_timestamp_from_meta(row_meta.meta))),
343+
Some(row_meta) => Ok(A::output_for(extract_timestamp_from_meta(
344+
row_meta.source_meta,
345+
))),
338346
None => parse_field(desc), // parse from payload
339347
},
340348
(_, &Some(AdditionalColumnType::CollectionName(_))) => {
@@ -344,7 +352,7 @@ impl SourceStreamChunkRowWriter<'_> {
344352
(_, &Some(AdditionalColumnType::Subject(_))) => Ok(A::output_for(
345353
self.row_meta
346354
.as_ref()
347-
.and_then(|ele| extract_subject_from_meta(ele.meta))
355+
.and_then(|ele| extract_subject_from_meta(ele.source_meta))
348356
.unwrap_or(None),
349357
)),
350358
(_, &Some(AdditionalColumnType::Partition(_))) => {
@@ -369,7 +377,7 @@ impl SourceStreamChunkRowWriter<'_> {
369377
.as_ref()
370378
.and_then(|ele| {
371379
extract_header_inner_from_meta(
372-
ele.meta,
380+
ele.source_meta,
373381
header_inner.inner_field.as_ref(),
374382
header_inner.data_type.as_ref(),
375383
)
@@ -380,7 +388,7 @@ impl SourceStreamChunkRowWriter<'_> {
380388
(_, &Some(AdditionalColumnType::Headers(_))) => Ok(A::output_for(
381389
self.row_meta
382390
.as_ref()
383-
.and_then(|ele| extract_headers_from_meta(ele.meta))
391+
.and_then(|ele| extract_headers_from_meta(ele.source_meta))
384392
.unwrap_or(None),
385393
)),
386394
(_, &Some(AdditionalColumnType::Filename(_))) => {

src/connector/src/parser/mod.rs

Lines changed: 29 additions & 26 deletions
Original file line numberDiff line numberDiff line change
@@ -48,7 +48,7 @@ use crate::parser::maxwell::MaxwellParser;
4848
use crate::schema::schema_registry::SchemaRegistryAuth;
4949
use crate::source::monitor::GLOBAL_SOURCE_METRICS;
5050
use crate::source::{
51-
BoxSourceStream, ChunkSourceStream, SourceColumnDesc, SourceColumnType, SourceContext,
51+
BoxSourceMessageStream, SourceChunkStream, SourceColumnDesc, SourceColumnType, SourceContext,
5252
SourceContextRef, SourceCtrlOpts, SourceMeta,
5353
};
5454

@@ -85,7 +85,7 @@ pub use unified::{AccessError, AccessResult};
8585
/// Extracted from the `SourceMessage`.
8686
#[derive(Clone, Copy, Debug)]
8787
pub struct MessageMeta<'a> {
88-
meta: &'a SourceMeta,
88+
source_meta: &'a SourceMeta,
8989
split_id: &'a str,
9090
offset: &'a str,
9191
}
@@ -102,15 +102,15 @@ impl<'a> MessageMeta<'a> {
102102
// Extract the offset from the meta data.
103103
SourceColumnType::Offset => Some(self.offset.into()),
104104
// Extract custom meta data per connector.
105-
SourceColumnType::Meta if let SourceMeta::Kafka(kafka_meta) = self.meta => {
105+
SourceColumnType::Meta if let SourceMeta::Kafka(kafka_meta) = self.source_meta => {
106106
assert_eq!(
107107
desc.name.as_str(),
108108
KAFKA_TIMESTAMP_COLUMN_NAME,
109109
"unexpected kafka meta column name"
110110
);
111111
kafka_meta.extract_timestamp()
112112
}
113-
SourceColumnType::Meta if let SourceMeta::DebeziumCdc(cdc_meta) = self.meta => {
113+
SourceColumnType::Meta if let SourceMeta::DebeziumCdc(cdc_meta) = self.source_meta => {
114114
assert_eq!(
115115
desc.name.as_str(),
116116
TABLE_NAME_COLUMN_NAME,
@@ -161,7 +161,7 @@ pub enum ParserFormat {
161161
/// `ByteStreamSourceParser` is the entrypoint abstraction for parsing messages.
162162
/// It consumes bytes of one individual message and produces parsed records.
163163
///
164-
/// It's used by [`ByteStreamSourceParserImpl::into_stream`]. `pub` is for benchmark only.
164+
/// It's used by [`ByteStreamSourceParserImpl::parse_stream`]. `pub` is for benchmark only.
165165
pub trait ByteStreamSourceParser: Send + Debug + Sized + 'static {
166166
/// The column descriptors of the output chunk.
167167
fn columns(&self) -> &[SourceColumnDesc];
@@ -202,35 +202,35 @@ pub trait ByteStreamSourceParser: Send + Debug + Sized + 'static {
202202

203203
#[easy_ext::ext(SourceParserIntoStreamExt)]
204204
impl<P: ByteStreamSourceParser> P {
205-
/// Parse a stream of vectors of `SourceMessage` into a stream of [`StreamChunk`].
205+
/// Parse a `SourceMessage` stream into a [`StreamChunk`] stream.
206206
///
207207
/// # Arguments
208208
///
209-
/// - `msg_stream`: A stream of vectors of `SourceMessage`.
209+
/// - `msg_stream`: A stream of batches of `SourceMessage`.
210210
///
211211
/// # Returns
212212
///
213-
/// A [`ChunkSourceStream`] which is a stream of parsed chunks. Each of the parsed chunks
214-
/// are guaranteed to have less than or equal to `source_ctrl_opts.chunk_size` rows, unless
215-
/// there's a large transaction and `source_ctrl_opts.split_txn` is false.
216-
pub fn into_stream(self, msg_stream: BoxSourceStream) -> impl ChunkSourceStream {
213+
/// A [`SourceChunkStream`] of parsed chunks. Each of the parsed chunks are guaranteed
214+
/// to have less than or equal to `source_ctrl_opts.chunk_size` rows, unless there's a
215+
/// large transaction and `source_ctrl_opts.split_txn` is false.
216+
pub fn parse_stream(self, msg_stream: BoxSourceMessageStream) -> impl SourceChunkStream {
217217
let actor_id = self.source_ctx().actor_id;
218218
let source_id = self.source_ctx().source_id.table_id();
219219

220220
// The stream will be long-lived. We use `instrument_with` here to create
221221
// a new span for the polling of each chunk.
222222
let source_ctrl_opts = self.source_ctx().source_ctrl_opts;
223-
into_chunk_stream_inner(self, msg_stream, source_ctrl_opts)
223+
parse_message_stream(self, msg_stream, source_ctrl_opts)
224224
.instrument_with(move || tracing::info_span!("source_parse_chunk", actor_id, source_id))
225225
}
226226
}
227227

228228
// TODO: when upsert is disabled, how to filter those empty payload
229229
// Currently, an err is returned for non upsert with empty payload
230230
#[try_stream(ok = StreamChunk, error = crate::error::ConnectorError)]
231-
async fn into_chunk_stream_inner<P: ByteStreamSourceParser>(
231+
async fn parse_message_stream<P: ByteStreamSourceParser>(
232232
mut parser: P,
233-
msg_stream: BoxSourceStream,
233+
msg_stream: BoxSourceMessageStream,
234234
source_ctrl_opts: SourceCtrlOpts,
235235
) {
236236
let mut chunk_builder =
@@ -266,7 +266,7 @@ async fn into_chunk_stream_inner<P: ByteStreamSourceParser>(
266266
"handling a heartbeat message"
267267
);
268268
chunk_builder.heartbeat(MessageMeta {
269-
meta: &heartbeat_msg.meta,
269+
source_meta: &heartbeat_msg.meta,
270270
split_id: &heartbeat_msg.split_id,
271271
offset: &heartbeat_msg.offset,
272272
});
@@ -312,7 +312,7 @@ async fn into_chunk_stream_inner<P: ByteStreamSourceParser>(
312312
msg.key,
313313
msg.payload,
314314
chunk_builder.row_writer().with_meta(MessageMeta {
315-
meta: &msg.meta,
315+
source_meta: &msg.meta,
316316
split_id: &msg.split_id,
317317
offset: &msg.offset,
318318
}),
@@ -428,16 +428,19 @@ pub enum ByteStreamSourceParserImpl {
428428

429429
impl ByteStreamSourceParserImpl {
430430
/// Converts `SourceMessage` vec stream into [`StreamChunk`] stream.
431-
pub fn into_stream(self, msg_stream: BoxSourceStream) -> impl ChunkSourceStream + Unpin {
431+
pub fn parse_stream(
432+
self,
433+
msg_stream: BoxSourceMessageStream,
434+
) -> impl SourceChunkStream + Unpin {
432435
#[auto_enum(futures03::Stream)]
433436
let stream = match self {
434-
Self::Csv(parser) => parser.into_stream(msg_stream),
435-
Self::Debezium(parser) => parser.into_stream(msg_stream),
436-
Self::DebeziumMongoJson(parser) => parser.into_stream(msg_stream),
437-
Self::Maxwell(parser) => parser.into_stream(msg_stream),
438-
Self::CanalJson(parser) => parser.into_stream(msg_stream),
439-
Self::Plain(parser) => parser.into_stream(msg_stream),
440-
Self::Upsert(parser) => parser.into_stream(msg_stream),
437+
Self::Csv(parser) => parser.parse_stream(msg_stream),
438+
Self::Debezium(parser) => parser.parse_stream(msg_stream),
439+
Self::DebeziumMongoJson(parser) => parser.parse_stream(msg_stream),
440+
Self::Maxwell(parser) => parser.parse_stream(msg_stream),
441+
Self::CanalJson(parser) => parser.parse_stream(msg_stream),
442+
Self::Plain(parser) => parser.parse_stream(msg_stream),
443+
Self::Upsert(parser) => parser.parse_stream(msg_stream),
441444
};
442445
Box::pin(stream)
443446
}
@@ -513,7 +516,7 @@ pub mod test_utils {
513516
})
514517
.collect_vec();
515518

516-
self.into_stream(futures::stream::once(async { Ok(source_messages) }).boxed())
519+
self.parse_stream(futures::stream::once(async { Ok(source_messages) }).boxed())
517520
.next()
518521
.await
519522
.unwrap()
@@ -531,7 +534,7 @@ pub mod test_utils {
531534
})
532535
.collect_vec();
533536

534-
self.into_stream(futures::stream::once(async { Ok(source_messages) }).boxed())
537+
self.parse_stream(futures::stream::once(async { Ok(source_messages) }).boxed())
535538
.next()
536539
.await
537540
.unwrap()

src/connector/src/parser/plain_parser.rs

Lines changed: 5 additions & 5 deletions
Original file line numberDiff line numberDiff line change
@@ -95,7 +95,7 @@ impl PlainParser {
9595
// plain parser also used in the shared cdc source,
9696
// we need to handle transaction metadata and schema change messages here
9797
if let Some(msg_meta) = writer.row_meta()
98-
&& let SourceMeta::DebeziumCdc(cdc_meta) = msg_meta.meta
98+
&& let SourceMeta::DebeziumCdc(cdc_meta) = msg_meta.source_meta
9999
&& let Some(data) = payload
100100
{
101101
match cdc_meta.msg_type {
@@ -252,7 +252,7 @@ mod tests {
252252
let mut transactional = false;
253253
// for untransactional source, we expect emit a chunk for each message batch
254254
let message_stream = source_message_stream(transactional);
255-
let chunk_stream = crate::parser::into_chunk_stream_inner(
255+
let chunk_stream = crate::parser::parse_message_stream(
256256
parser,
257257
message_stream.boxed(),
258258
SourceCtrlOpts::for_test(),
@@ -293,7 +293,7 @@ mod tests {
293293
// for transactional source, we expect emit a single chunk for the transaction
294294
transactional = true;
295295
let message_stream = source_message_stream(transactional);
296-
let chunk_stream = crate::parser::into_chunk_stream_inner(
296+
let chunk_stream = crate::parser::parse_message_stream(
297297
parser,
298298
message_stream.boxed(),
299299
SourceCtrlOpts::for_test(),
@@ -426,7 +426,7 @@ mod tests {
426426
cdc_message::CdcMessageType::TransactionMeta,
427427
));
428428
let msg_meta = MessageMeta {
429-
meta: &cdc_meta,
429+
source_meta: &cdc_meta,
430430
split_id: "1001",
431431
offset: "",
432432
};
@@ -500,7 +500,7 @@ mod tests {
500500
cdc_message::CdcMessageType::SchemaChange,
501501
));
502502
let msg_meta = MessageMeta {
503-
meta: &cdc_meta,
503+
source_meta: &cdc_meta,
504504
split_id: "1001",
505505
offset: "",
506506
};

src/connector/src/source/base.rs

Lines changed: 8 additions & 6 deletions
Original file line numberDiff line numberDiff line change
@@ -353,20 +353,22 @@ pub fn extract_source_struct(info: &PbStreamSourceInfo) -> Result<SourceStruct>
353353
Ok(SourceStruct::new(format, encode))
354354
}
355355

356-
/// Stream of [`SourceMessage`].
357-
pub type BoxSourceStream = BoxStream<'static, crate::error::ConnectorResult<Vec<SourceMessage>>>;
356+
/// Stream of [`SourceMessage`]. Messages flow through the stream in the unit of a batch.
357+
pub type BoxSourceMessageStream =
358+
BoxStream<'static, crate::error::ConnectorResult<Vec<SourceMessage>>>;
359+
/// Stream of [`StreamChunk`]s parsed from the messages from the external source.
360+
pub type BoxSourceChunkStream = BoxStream<'static, crate::error::ConnectorResult<StreamChunk>>;
358361

359362
// Manually expand the trait alias to improve IDE experience.
360-
pub trait ChunkSourceStream:
363+
pub trait SourceChunkStream:
361364
Stream<Item = crate::error::ConnectorResult<StreamChunk>> + Send + 'static
362365
{
363366
}
364-
impl<T> ChunkSourceStream for T where
367+
impl<T> SourceChunkStream for T where
365368
T: Stream<Item = crate::error::ConnectorResult<StreamChunk>> + Send + 'static
366369
{
367370
}
368371

369-
pub type BoxChunkSourceStream = BoxStream<'static, crate::error::ConnectorResult<StreamChunk>>;
370372
pub type BoxTryStream<M> = BoxStream<'static, crate::error::ConnectorResult<M>>;
371373

372374
/// [`SplitReader`] is a new abstraction of the external connector read interface which is
@@ -385,7 +387,7 @@ pub trait SplitReader: Sized + Send {
385387
columns: Option<Vec<Column>>,
386388
) -> crate::error::ConnectorResult<Self>;
387389

388-
fn into_stream(self) -> BoxChunkSourceStream;
390+
fn into_stream(self) -> BoxSourceChunkStream;
389391

390392
fn backfill_info(&self) -> HashMap<SplitId, BackfillInfo> {
391393
HashMap::new()

src/connector/src/source/cdc/source/reader.rs

Lines changed: 2 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -33,7 +33,7 @@ use crate::parser::ParserConfig;
3333
use crate::source::base::SourceMessage;
3434
use crate::source::cdc::{CdcProperties, CdcSourceType, CdcSourceTypeTrait, DebeziumCdcSplit};
3535
use crate::source::{
36-
into_chunk_stream, BoxChunkSourceStream, Column, SourceContextRef, SplitId, SplitMetaData,
36+
into_chunk_stream, BoxSourceChunkStream, Column, SourceContextRef, SplitId, SplitMetaData,
3737
SplitReader,
3838
};
3939

@@ -199,7 +199,7 @@ impl<T: CdcSourceTypeTrait> SplitReader for CdcSplitReader<T> {
199199
Ok(instance)
200200
}
201201

202-
fn into_stream(self) -> BoxChunkSourceStream {
202+
fn into_stream(self) -> BoxSourceChunkStream {
203203
let parser_config = self.parser_config.clone();
204204
let source_context = self.source_ctx.clone();
205205
into_chunk_stream(self.into_data_stream(), parser_config, source_context)

src/connector/src/source/common.rs

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -93,7 +93,7 @@ pub(crate) async fn into_chunk_stream(
9393
let parser =
9494
crate::parser::ByteStreamSourceParserImpl::create(parser_config, source_ctx).await?;
9595
#[for_await]
96-
for chunk in parser.into_stream(data_stream) {
96+
for chunk in parser.parse_stream(data_stream) {
9797
yield chunk?;
9898
}
9999
}

src/connector/src/source/datagen/source/reader.rs

Lines changed: 2 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -28,7 +28,7 @@ use crate::source::data_gen_util::spawn_data_generation_stream;
2828
use crate::source::datagen::source::SEQUENCE_FIELD_KIND;
2929
use crate::source::datagen::{DatagenProperties, DatagenSplit, FieldDesc};
3030
use crate::source::{
31-
into_chunk_stream, BoxChunkSourceStream, Column, DataType, SourceContextRef, SourceMessage,
31+
into_chunk_stream, BoxSourceChunkStream, Column, DataType, SourceContextRef, SourceMessage,
3232
SplitId, SplitMetaData, SplitReader,
3333
};
3434

@@ -142,7 +142,7 @@ impl SplitReader for DatagenSplitReader {
142142
})
143143
}
144144

145-
fn into_stream(self) -> BoxChunkSourceStream {
145+
fn into_stream(self) -> BoxSourceChunkStream {
146146
// Will buffer at most 4 event chunks.
147147
const BUFFER_SIZE: usize = 4;
148148
// spawn_data_generation_stream(self.generator.into_native_stream(), BUFFER_SIZE).boxed()

0 commit comments

Comments
 (0)