Skip to content

Commit 8cdc90e

Browse files
committed
extract more common cdc logic
1 parent c11d850 commit 8cdc90e

File tree

4 files changed

+104
-104
lines changed

4 files changed

+104
-104
lines changed

src/connector/src/macros.rs

Lines changed: 88 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -180,7 +180,8 @@ macro_rules! impl_connector_properties {
180180
pub fn extract(mut props: HashMap<String, String>) -> Result<Self> {
181181
const UPSTREAM_SOURCE_KEY: &str = "connector";
182182
let connector = props.remove(UPSTREAM_SOURCE_KEY).ok_or_else(|| anyhow!("Must specify 'connector' in WITH clause"))?;
183-
if connector.ends_with("-cdc") {
183+
use $crate::source::cdc::CDC_CONNECTOR_NAME_SUFFIX;
184+
if connector.ends_with(CDC_CONNECTOR_NAME_SUFFIX) {
184185
ConnectorProperties::new_cdc_properties(&connector, props)
185186
} else {
186187
let json_value = serde_json::to_value(props).map_err(|e| anyhow!(e))?;
@@ -199,3 +200,89 @@ macro_rules! impl_connector_properties {
199200
}
200201
}
201202
}
203+
204+
#[macro_export]
205+
macro_rules! impl_cdc_source_type {
206+
($({$source_type:ident, $name:expr }),*) => {
207+
$(
208+
paste!{
209+
#[derive(Clone, Debug, Default, PartialEq, Eq, Hash)]
210+
pub struct $source_type;
211+
impl CdcSourceTypeTrait for $source_type {
212+
const CDC_CONNECTOR_NAME: &'static str = concat!($name, "-cdc");
213+
fn source_type() -> CdcSourceType {
214+
CdcSourceType::$source_type
215+
}
216+
}
217+
218+
pub type [< $source_type DebeziumSplitEnumerator >] = DebeziumSplitEnumerator<$source_type>;
219+
}
220+
)*
221+
222+
pub enum CdcSourceType {
223+
$(
224+
$source_type,
225+
)*
226+
}
227+
228+
impl From<PbSourceType> for CdcSourceType {
229+
fn from(value: PbSourceType) -> Self {
230+
match value {
231+
PbSourceType::Unspecified => unreachable!(),
232+
$(
233+
PbSourceType::$source_type => CdcSourceType::$source_type,
234+
)*
235+
}
236+
}
237+
}
238+
239+
impl From<CdcSourceType> for PbSourceType {
240+
fn from(this: CdcSourceType) -> PbSourceType {
241+
match this {
242+
$(
243+
CdcSourceType::$source_type => PbSourceType::$source_type,
244+
)*
245+
}
246+
}
247+
}
248+
249+
impl ConnectorProperties {
250+
pub(crate) fn new_cdc_properties(
251+
connector_name: &str,
252+
properties: HashMap<String, String>,
253+
) -> std::result::Result<Self, anyhow::Error> {
254+
match connector_name {
255+
$(
256+
$source_type::CDC_CONNECTOR_NAME => paste! {
257+
Ok(Self::[< $source_type Cdc >](Box::new(CdcProperties::<$source_type> {
258+
props: properties,
259+
..Default::default()
260+
})))
261+
},
262+
)*
263+
_ => Err(anyhow::anyhow!("unexpected cdc connector '{}'", connector_name,)),
264+
}
265+
}
266+
267+
pub fn init_cdc_properties(&mut self, table_schema: PbTableSchema) {
268+
match self {
269+
$(
270+
paste! {ConnectorProperties:: [< $source_type Cdc >](c)} => {
271+
c.table_schema = table_schema;
272+
}
273+
)*
274+
_ => {}
275+
}
276+
}
277+
278+
pub fn is_cdc_connector(&self) -> bool {
279+
match self {
280+
$(
281+
paste! {ConnectorProperties:: [< $source_type Cdc >](_)} => true,
282+
)*
283+
_ => false,
284+
}
285+
}
286+
}
287+
}
288+
}

src/connector/src/source/base.rs

Lines changed: 9 additions & 55 deletions
Original file line numberDiff line numberDiff line change
@@ -27,7 +27,6 @@ use risingwave_common::array::StreamChunk;
2727
use risingwave_common::catalog::TableId;
2828
use risingwave_common::error::{ErrorSuppressor, RwError};
2929
use risingwave_common::types::{JsonbVal, Scalar};
30-
use risingwave_pb::connector_service::PbTableSchema;
3130
use risingwave_pb::source::ConnectorSplit;
3231
use risingwave_rpc_client::ConnectorClient;
3332

@@ -299,7 +298,7 @@ pub enum ConnectorProperties {
299298
Nexmark(Box<NexmarkProperties>),
300299
Datagen(Box<DatagenProperties>),
301300
S3(Box<S3Properties>),
302-
MySqlCdc(Box<CdcProperties<Mysql>>),
301+
MysqlCdc(Box<CdcProperties<Mysql>>),
303302
PostgresCdc(Box<CdcProperties<Postgres>>),
304303
CitusCdc(Box<CdcProperties<Citus>>),
305304
GooglePubsub(Box<PubsubProperties>),
@@ -308,51 +307,6 @@ pub enum ConnectorProperties {
308307
}
309308

310309
impl ConnectorProperties {
311-
fn new_cdc_properties(
312-
connector_name: &str,
313-
properties: HashMap<String, String>,
314-
) -> Result<Self> {
315-
match connector_name {
316-
MYSQL_CDC_CONNECTOR => Ok(Self::MySqlCdc(Box::new(CdcProperties::<Mysql> {
317-
props: properties,
318-
..Default::default()
319-
}))),
320-
POSTGRES_CDC_CONNECTOR => Ok(Self::PostgresCdc(Box::new(CdcProperties::<Postgres> {
321-
props: properties,
322-
..Default::default()
323-
}))),
324-
CITUS_CDC_CONNECTOR => Ok(Self::CitusCdc(Box::new(CdcProperties::<Citus> {
325-
props: properties,
326-
..Default::default()
327-
}))),
328-
_ => Err(anyhow!("unexpected cdc connector '{}'", connector_name,)),
329-
}
330-
}
331-
332-
pub fn init_cdc_properties(&mut self, table_schema: PbTableSchema) {
333-
match self {
334-
ConnectorProperties::MySqlCdc(c) => {
335-
c.table_schema = table_schema;
336-
}
337-
ConnectorProperties::PostgresCdc(c) => {
338-
c.table_schema = table_schema;
339-
}
340-
ConnectorProperties::CitusCdc(c) => {
341-
c.table_schema = table_schema;
342-
}
343-
_ => {}
344-
}
345-
}
346-
347-
pub fn is_cdc_connector(&self) -> bool {
348-
matches!(
349-
self,
350-
ConnectorProperties::MySqlCdc(_)
351-
| ConnectorProperties::PostgresCdc(_)
352-
| ConnectorProperties::CitusCdc(_)
353-
)
354-
}
355-
356310
pub fn support_multiple_splits(&self) -> bool {
357311
matches!(self, ConnectorProperties::Kafka(_))
358312
}
@@ -366,7 +320,7 @@ pub enum SplitImpl {
366320
Nexmark(NexmarkSplit),
367321
Datagen(DatagenSplit),
368322
GooglePubsub(PubsubSplit),
369-
MySqlCdc(DebeziumCdcSplit<Mysql>),
323+
MysqlCdc(DebeziumCdcSplit<Mysql>),
370324
PostgresCdc(DebeziumCdcSplit<Postgres>),
371325
CitusCdc(DebeziumCdcSplit<Citus>),
372326
Nats(NatsSplit),
@@ -399,7 +353,7 @@ pub enum SplitReaderImpl {
399353
Nexmark(Box<NexmarkSplitReader>),
400354
Pulsar(Box<PulsarSplitReader>),
401355
Datagen(Box<DatagenSplitReader>),
402-
MySqlCdc(Box<CdcSplitReader<Mysql>>),
356+
MysqlCdc(Box<CdcSplitReader<Mysql>>),
403357
PostgresCdc(Box<CdcSplitReader<Postgres>>),
404358
CitusCdc(Box<CdcSplitReader<Citus>>),
405359
GooglePubsub(Box<PubsubSplitReader>),
@@ -412,7 +366,7 @@ pub enum SplitEnumeratorImpl {
412366
Kinesis(KinesisSplitEnumerator),
413367
Nexmark(NexmarkSplitEnumerator),
414368
Datagen(DatagenSplitEnumerator),
415-
MySqlCdc(MysqlDebeziumSplitEnumerator),
369+
MysqlCdc(MysqlDebeziumSplitEnumerator),
416370
PostgresCdc(PostgresDebeziumSplitEnumerator),
417371
CitusCdc(CitusDebeziumSplitEnumerator),
418372
GooglePubsub(PubsubSplitEnumerator),
@@ -437,7 +391,7 @@ impl_split_enumerator! {
437391
{ Kinesis, KinesisSplitEnumerator },
438392
{ Nexmark, NexmarkSplitEnumerator },
439393
{ Datagen, DatagenSplitEnumerator },
440-
{ MySqlCdc, DebeziumSplitEnumerator },
394+
{ MysqlCdc, DebeziumSplitEnumerator },
441395
{ PostgresCdc, DebeziumSplitEnumerator },
442396
{ CitusCdc, DebeziumSplitEnumerator },
443397
{ GooglePubsub, PubsubSplitEnumerator},
@@ -452,7 +406,7 @@ impl_split! {
452406
{ Nexmark, NEXMARK_CONNECTOR, NexmarkSplit },
453407
{ Datagen, DATAGEN_CONNECTOR, DatagenSplit },
454408
{ GooglePubsub, GOOGLE_PUBSUB_CONNECTOR, PubsubSplit },
455-
{ MySqlCdc, MYSQL_CDC_CONNECTOR, DebeziumCdcSplit<Mysql> },
409+
{ MysqlCdc, MYSQL_CDC_CONNECTOR, DebeziumCdcSplit<Mysql> },
456410
{ PostgresCdc, POSTGRES_CDC_CONNECTOR, DebeziumCdcSplit<Postgres> },
457411
{ CitusCdc, CITUS_CDC_CONNECTOR, DebeziumCdcSplit<Citus> },
458412
{ S3, S3_CONNECTOR, FsSplit },
@@ -466,7 +420,7 @@ impl_split_reader! {
466420
{ Kinesis, KinesisSplitReader },
467421
{ Nexmark, NexmarkSplitReader },
468422
{ Datagen, DatagenSplitReader },
469-
{ MySqlCdc, CdcSplitReader},
423+
{ MysqlCdc, CdcSplitReader},
470424
{ PostgresCdc, CdcSplitReader},
471425
{ CitusCdc, CdcSplitReader },
472426
{ GooglePubsub, PubsubSplitReader },
@@ -565,7 +519,7 @@ mod tests {
565519
let offset_str = "{\"sourcePartition\":{\"server\":\"RW_CDC_mydb.products\"},\"sourceOffset\":{\"transaction_id\":null,\"ts_sec\":1670407377,\"file\":\"binlog.000001\",\"pos\":98587,\"row\":2,\"server_id\":1,\"event\":2}}";
566520
let mysql_split = MySqlCdcSplit::new(1001, offset_str.to_string());
567521
let split = DebeziumCdcSplit::new(Some(mysql_split), None);
568-
let split_impl = SplitImpl::MySqlCdc(split);
522+
let split_impl = SplitImpl::MysqlCdc(split);
569523
let encoded_split = split_impl.encode_to_bytes();
570524
let restored_split_impl = SplitImpl::restore_from_bytes(encoded_split.as_ref())?;
571525
assert_eq!(
@@ -653,7 +607,7 @@ mod tests {
653607
));
654608

655609
let conn_props = ConnectorProperties::extract(user_props_mysql).unwrap();
656-
if let ConnectorProperties::MySqlCdc(c) = conn_props {
610+
if let ConnectorProperties::MysqlCdc(c) = conn_props {
657611
assert_eq!(c.props.get("connector_node_addr").unwrap(), "localhost");
658612
assert_eq!(c.props.get("database.hostname").unwrap(), "127.0.0.1");
659613
assert_eq!(c.props.get("database.port").unwrap(), "3306");

src/connector/src/source/cdc/mod.rs

Lines changed: 6 additions & 47 deletions
Original file line numberDiff line numberDiff line change
@@ -21,10 +21,15 @@ use std::marker::PhantomData;
2121
pub use enumerator::*;
2222
use paste::paste;
2323
use risingwave_common::catalog::{ColumnDesc, Field, Schema};
24-
use risingwave_pb::connector_service::{PbSourceType, SourceType, TableSchema};
24+
use risingwave_pb::connector_service::{PbSourceType, PbTableSchema, SourceType, TableSchema};
2525
pub use source::*;
2626
pub use split::*;
2727

28+
use crate::impl_cdc_source_type;
29+
use crate::source::ConnectorProperties;
30+
31+
pub const CDC_CONNECTOR_NAME_SUFFIX: &str = "-cdc";
32+
2833
pub const MYSQL_CDC_CONNECTOR: &str = Mysql::CDC_CONNECTOR_NAME;
2934
pub const POSTGRES_CDC_CONNECTOR: &str = Postgres::CDC_CONNECTOR_NAME;
3035
pub const CITUS_CDC_CONNECTOR: &str = Citus::CDC_CONNECTOR_NAME;
@@ -34,52 +39,6 @@ pub trait CdcSourceTypeTrait: Send + Sync + Clone + 'static {
3439
fn source_type() -> CdcSourceType;
3540
}
3641

37-
macro_rules! impl_cdc_source_type {
38-
($({$source_type:ident, $name:expr }),*) => {
39-
$(
40-
paste!{
41-
#[derive(Clone, Debug, Default, PartialEq, Eq, Hash)]
42-
pub struct $source_type;
43-
impl CdcSourceTypeTrait for $source_type {
44-
const CDC_CONNECTOR_NAME: &'static str = concat!($name, "-cdc");
45-
fn source_type() -> CdcSourceType {
46-
CdcSourceType::$source_type
47-
}
48-
}
49-
50-
pub type [< $source_type DebeziumSplitEnumerator >] = DebeziumSplitEnumerator<$source_type>;
51-
}
52-
)*
53-
54-
pub enum CdcSourceType {
55-
$(
56-
$source_type,
57-
)*
58-
}
59-
60-
impl From<PbSourceType> for CdcSourceType {
61-
fn from(value: PbSourceType) -> Self {
62-
match value {
63-
PbSourceType::Unspecified => unreachable!(),
64-
$(
65-
PbSourceType::$source_type => CdcSourceType::$source_type,
66-
)*
67-
}
68-
}
69-
}
70-
71-
impl From<CdcSourceType> for PbSourceType {
72-
fn from(this: CdcSourceType) -> PbSourceType {
73-
match this {
74-
$(
75-
CdcSourceType::$source_type => PbSourceType::$source_type,
76-
)*
77-
}
78-
}
79-
}
80-
}
81-
}
82-
8342
impl_cdc_source_type!({ Mysql, "mysql" }, { Postgres, "postgres" }, { Citus, "citus" });
8443

8544
#[derive(Clone, Debug, Default)]

src/stream/src/executor/backfill/cdc_backfill.rs

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -492,7 +492,7 @@ impl<S: StateStore> CdcBackfillExecutor<S> {
492492
.set(key.into(), JsonbVal::from(Value::Bool(true)))
493493
.await?;
494494

495-
if let Some(SplitImpl::MySqlCdc(split)) = cdc_split.as_mut()
495+
if let Some(SplitImpl::MysqlCdc(split)) = cdc_split.as_mut()
496496
&& let Some(s) = split.mysql_split.as_mut() {
497497
let start_offset =
498498
last_binlog_offset.as_ref().map(|cdc_offset| {

0 commit comments

Comments
 (0)