Skip to content

Commit ec0027e

Browse files
authored
refactor(source): implement the common split reader as a generic function (risingwavelabs#12059)
1 parent 5cf0351 commit ec0027e

File tree

10 files changed

+138
-111
lines changed

10 files changed

+138
-111
lines changed

src/connector/src/macros.rs

Lines changed: 0 additions & 54 deletions
Original file line numberDiff line numberDiff line change
@@ -180,57 +180,3 @@ macro_rules! impl_connector_properties {
180180
}
181181
}
182182
}
183-
184-
#[macro_export]
185-
macro_rules! impl_common_split_reader_logic {
186-
($reader:ty, $props:ty) => {
187-
impl $reader {
188-
#[try_stream(boxed, ok = $crate::source::StreamChunkWithState, error = risingwave_common::error::RwError)]
189-
pub(crate) async fn into_chunk_stream(self) {
190-
let parser_config = self.parser_config.clone();
191-
let actor_id = self.source_ctx.source_info.actor_id.to_string();
192-
let source_id = self.source_ctx.source_info.source_id.to_string();
193-
let metrics = self.source_ctx.metrics.clone();
194-
let source_ctx = self.source_ctx.clone();
195-
196-
let data_stream = self.into_data_stream();
197-
198-
let data_stream = data_stream
199-
.inspect_ok(move |data_batch| {
200-
let mut by_split_id = std::collections::HashMap::new();
201-
202-
for msg in data_batch {
203-
by_split_id
204-
.entry(msg.split_id.as_ref())
205-
.or_insert_with(Vec::new)
206-
.push(msg);
207-
}
208-
209-
for (split_id, msgs) in by_split_id {
210-
metrics
211-
.partition_input_count
212-
.with_label_values(&[&actor_id, &source_id, split_id])
213-
.inc_by(msgs.len() as u64);
214-
215-
let sum_bytes = msgs
216-
.iter()
217-
.flat_map(|msg| msg.payload.as_ref().map(|p| p.len() as u64))
218-
.sum();
219-
220-
metrics
221-
.partition_input_bytes
222-
.with_label_values(&[&actor_id, &source_id, &split_id])
223-
.inc_by(sum_bytes);
224-
}
225-
}).boxed();
226-
227-
let parser =
228-
$crate::parser::ByteStreamSourceParserImpl::create(parser_config, source_ctx).await?;
229-
#[for_await]
230-
for msg_batch in parser.into_stream(data_stream) {
231-
yield msg_batch?;
232-
}
233-
}
234-
}
235-
};
236-
}

src/connector/src/source/cdc/source/reader.rs

Lines changed: 7 additions & 7 deletions
Original file line numberDiff line numberDiff line change
@@ -16,22 +16,20 @@ use std::str::FromStr;
1616

1717
use anyhow::{anyhow, Result};
1818
use async_trait::async_trait;
19-
use futures::{pin_mut, StreamExt, TryStreamExt};
19+
use futures::pin_mut;
2020
use futures_async_stream::try_stream;
2121
use risingwave_common::util::addr::HostAddr;
2222
use risingwave_pb::connector_service::GetEventStreamResponse;
2323

24-
use crate::impl_common_split_reader_logic;
2524
use crate::parser::ParserConfig;
2625
use crate::source::base::SourceMessage;
2726
use crate::source::cdc::CdcProperties;
27+
use crate::source::common::{into_chunk_stream, CommonSplitReader};
2828
use crate::source::{
2929
BoxSourceWithStateStream, Column, SourceContextRef, SplitId, SplitImpl, SplitMetaData,
3030
SplitReader,
3131
};
3232

33-
impl_common_split_reader_logic!(CdcSplitReader, CdcProperties);
34-
3533
pub struct CdcSplitReader {
3634
source_id: u64,
3735
start_offset: Option<String>,
@@ -90,12 +88,14 @@ impl SplitReader for CdcSplitReader {
9088
}
9189

9290
fn into_stream(self) -> BoxSourceWithStateStream {
93-
self.into_chunk_stream()
91+
let parser_config = self.parser_config.clone();
92+
let source_context = self.source_ctx.clone();
93+
into_chunk_stream(self, parser_config, source_context)
9494
}
9595
}
9696

97-
impl CdcSplitReader {
98-
#[try_stream(boxed, ok = Vec<SourceMessage>, error = anyhow::Error)]
97+
impl CommonSplitReader for CdcSplitReader {
98+
#[try_stream(ok = Vec<SourceMessage>, error = anyhow::Error)]
9999
async fn into_data_stream(self) {
100100
let cdc_client = self.source_ctx.connector_client.clone().ok_or_else(|| {
101101
anyhow!("connector node endpoint not specified or unable to connect to connector node")

src/connector/src/source/common.rs

Lines changed: 76 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,76 @@
1+
// Copyright 2023 RisingWave Labs
2+
//
3+
// Licensed under the Apache License, Version 2.0 (the "License");
4+
// you may not use this file except in compliance with the License.
5+
// You may obtain a copy of the License at
6+
//
7+
// http://www.apache.org/licenses/LICENSE-2.0
8+
//
9+
// Unless required by applicable law or agreed to in writing, software
10+
// distributed under the License is distributed on an "AS IS" BASIS,
11+
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
12+
// See the License for the specific language governing permissions and
13+
// limitations under the License.
14+
15+
use futures::{Stream, StreamExt, TryStreamExt};
16+
use futures_async_stream::try_stream;
17+
use risingwave_common::error::RwError;
18+
19+
use crate::parser::ParserConfig;
20+
use crate::source::{SourceContextRef, SourceMessage, SplitReader, StreamChunkWithState};
21+
22+
pub(crate) trait CommonSplitReader: SplitReader + 'static {
23+
fn into_data_stream(
24+
self,
25+
) -> impl Stream<Item = Result<Vec<SourceMessage>, anyhow::Error>> + Send;
26+
}
27+
28+
#[try_stream(boxed, ok = StreamChunkWithState, error = RwError)]
29+
pub(crate) async fn into_chunk_stream(
30+
reader: impl CommonSplitReader + Send,
31+
parser_config: ParserConfig,
32+
source_ctx: SourceContextRef,
33+
) {
34+
let actor_id = source_ctx.source_info.actor_id.to_string();
35+
let source_id = source_ctx.source_info.source_id.to_string();
36+
let metrics = source_ctx.metrics.clone();
37+
38+
let data_stream = reader.into_data_stream();
39+
40+
let data_stream = data_stream
41+
.inspect_ok(move |data_batch| {
42+
let mut by_split_id = std::collections::HashMap::new();
43+
44+
for msg in data_batch {
45+
by_split_id
46+
.entry(msg.split_id.as_ref())
47+
.or_insert_with(Vec::new)
48+
.push(msg);
49+
}
50+
51+
for (split_id, msgs) in by_split_id {
52+
metrics
53+
.partition_input_count
54+
.with_label_values(&[&actor_id, &source_id, split_id])
55+
.inc_by(msgs.len() as u64);
56+
57+
let sum_bytes = msgs
58+
.iter()
59+
.flat_map(|msg| msg.payload.as_ref().map(|p| p.len() as u64))
60+
.sum();
61+
62+
metrics
63+
.partition_input_bytes
64+
.with_label_values(&[&actor_id, &source_id, split_id])
65+
.inc_by(sum_bytes);
66+
}
67+
})
68+
.boxed();
69+
70+
let parser =
71+
crate::parser::ByteStreamSourceParserImpl::create(parser_config, source_ctx).await?;
72+
#[for_await]
73+
for msg_batch in parser.into_stream(data_stream) {
74+
yield msg_batch?;
75+
}
76+
}

src/connector/src/source/datagen/source/reader.rs

Lines changed: 10 additions & 9 deletions
Original file line numberDiff line numberDiff line change
@@ -16,23 +16,20 @@ use std::collections::HashMap;
1616

1717
use anyhow::{anyhow, Result};
1818
use async_trait::async_trait;
19-
use futures::{StreamExt, TryStreamExt};
20-
use futures_async_stream::try_stream;
19+
use futures::{Stream, StreamExt, TryStreamExt};
2120
use risingwave_common::field_generator::{FieldGeneratorImpl, VarcharProperty};
2221

2322
use super::generator::DatagenEventGenerator;
24-
use crate::impl_common_split_reader_logic;
2523
use crate::parser::{EncodingProperties, ParserConfig, ProtocolProperties};
24+
use crate::source::common::{into_chunk_stream, CommonSplitReader};
2625
use crate::source::data_gen_util::spawn_data_generation_stream;
2726
use crate::source::datagen::source::SEQUENCE_FIELD_KIND;
2827
use crate::source::datagen::{DatagenProperties, DatagenSplit, FieldDesc};
2928
use crate::source::{
30-
BoxSourceStream, BoxSourceWithStateStream, Column, DataType, SourceContextRef, SplitId,
29+
BoxSourceWithStateStream, Column, DataType, SourceContextRef, SourceMessage, SplitId,
3130
SplitImpl, SplitMetaData, SplitReader,
3231
};
3332

34-
impl_common_split_reader_logic!(DatagenSplitReader, DatagenProperties);
35-
3633
pub struct DatagenSplitReader {
3734
generator: DatagenEventGenerator,
3835
assigned_split: DatagenSplit,
@@ -170,13 +167,17 @@ impl SplitReader for DatagenSplitReader {
170167
)
171168
.boxed()
172169
}
173-
_ => self.into_chunk_stream(),
170+
_ => {
171+
let parser_config = self.parser_config.clone();
172+
let source_context = self.source_ctx.clone();
173+
into_chunk_stream(self, parser_config, source_context)
174+
}
174175
}
175176
}
176177
}
177178

178-
impl DatagenSplitReader {
179-
pub(crate) fn into_data_stream(self) -> BoxSourceStream {
179+
impl CommonSplitReader for DatagenSplitReader {
180+
fn into_data_stream(self) -> impl Stream<Item = Result<Vec<SourceMessage>, anyhow::Error>> {
180181
// Will buffer at most 4 event chunks.
181182
const BUFFER_SIZE: usize = 4;
182183
spawn_data_generation_stream(self.generator.into_msg_stream(), BUFFER_SIZE).boxed()

src/connector/src/source/google_pubsub/source/reader.rs

Lines changed: 6 additions & 7 deletions
Original file line numberDiff line numberDiff line change
@@ -15,16 +15,15 @@
1515
use anyhow::{anyhow, ensure, Context, Result};
1616
use async_trait::async_trait;
1717
use chrono::{NaiveDateTime, TimeZone, Utc};
18-
use futures::{StreamExt, TryStreamExt};
1918
use futures_async_stream::try_stream;
2019
use google_cloud_pubsub::client::Client;
2120
use google_cloud_pubsub::subscription::{SeekTo, Subscription};
2221
use risingwave_common::bail;
2322
use tonic::Code;
2423

2524
use super::TaggedReceivedMessage;
26-
use crate::impl_common_split_reader_logic;
2725
use crate::parser::ParserConfig;
26+
use crate::source::common::{into_chunk_stream, CommonSplitReader};
2827
use crate::source::google_pubsub::PubsubProperties;
2928
use crate::source::{
3029
BoxSourceWithStateStream, Column, SourceContextRef, SourceMessage, SplitId, SplitImpl,
@@ -33,8 +32,6 @@ use crate::source::{
3332

3433
const PUBSUB_MAX_FETCH_MESSAGES: usize = 1024;
3534

36-
impl_common_split_reader_logic!(PubsubSplitReader, PubsubProperties);
37-
3835
pub struct PubsubSplitReader {
3936
subscription: Subscription,
4037
stop_offset: Option<NaiveDateTime>,
@@ -44,8 +41,8 @@ pub struct PubsubSplitReader {
4441
source_ctx: SourceContextRef,
4542
}
4643

47-
impl PubsubSplitReader {
48-
#[try_stream(boxed, ok = Vec<SourceMessage>, error = anyhow::Error)]
44+
impl CommonSplitReader for PubsubSplitReader {
45+
#[try_stream(ok = Vec<SourceMessage>, error = anyhow::Error)]
4946
async fn into_data_stream(self) {
5047
loop {
5148
let pull_result = self
@@ -172,6 +169,8 @@ impl SplitReader for PubsubSplitReader {
172169
}
173170

174171
fn into_stream(self) -> BoxSourceWithStateStream {
175-
self.into_chunk_stream()
172+
let parser_config = self.parser_config.clone();
173+
let source_context = self.source_ctx.clone();
174+
into_chunk_stream(self, parser_config, source_context)
176175
}
177176
}

src/connector/src/source/kafka/source/reader.rs

Lines changed: 9 additions & 7 deletions
Original file line numberDiff line numberDiff line change
@@ -19,16 +19,16 @@ use std::time::{Duration, SystemTime, UNIX_EPOCH};
1919

2020
use anyhow::{anyhow, Result};
2121
use async_trait::async_trait;
22-
use futures::{StreamExt, TryStreamExt};
22+
use futures::StreamExt;
2323
use futures_async_stream::try_stream;
2424
use rdkafka::config::RDKafkaLogLevel;
2525
use rdkafka::consumer::{Consumer, StreamConsumer};
2626
use rdkafka::error::KafkaError;
2727
use rdkafka::{ClientConfig, Message, Offset, TopicPartitionList};
2828

29-
use crate::impl_common_split_reader_logic;
3029
use crate::parser::ParserConfig;
3130
use crate::source::base::SourceMessage;
31+
use crate::source::common::{into_chunk_stream, CommonSplitReader};
3232
use crate::source::kafka::{
3333
KafkaProperties, KafkaSplit, PrivateLinkConsumerContext, KAFKA_ISOLATION_LEVEL,
3434
};
@@ -37,8 +37,6 @@ use crate::source::{
3737
SplitReader,
3838
};
3939

40-
impl_common_split_reader_logic!(KafkaSplitReader, KafkaProperties);
41-
4240
pub struct KafkaSplitReader {
4341
consumer: StreamConsumer<PrivateLinkConsumerContext>,
4442
offsets: HashMap<SplitId, (Option<i64>, Option<i64>)>,
@@ -159,7 +157,9 @@ impl SplitReader for KafkaSplitReader {
159157
}
160158

161159
fn into_stream(self) -> BoxSourceWithStateStream {
162-
self.into_chunk_stream()
160+
let parser_config = self.parser_config.clone();
161+
let source_context = self.source_ctx.clone();
162+
into_chunk_stream(self, parser_config, source_context)
163163
}
164164
}
165165

@@ -176,9 +176,11 @@ impl KafkaSplitReader {
176176
])
177177
.set(offset);
178178
}
179+
}
179180

180-
#[try_stream(boxed, ok = Vec<SourceMessage>, error = anyhow::Error)]
181-
pub async fn into_data_stream(self) {
181+
impl CommonSplitReader for KafkaSplitReader {
182+
#[try_stream(ok = Vec<SourceMessage>, error = anyhow::Error)]
183+
async fn into_data_stream(self) {
182184
if self.offsets.values().all(|(start_offset, stop_offset)| {
183185
match (start_offset, stop_offset) {
184186
(Some(start), Some(stop)) if (*start + 1) >= *stop => true,

0 commit comments

Comments
 (0)