Skip to content

Commit e51f639

Browse files
author
Eric Fu
authored
fix: improve CDC connector param check (risingwavelabs#8450)
1 parent b7c46d4 commit e51f639

File tree

10 files changed

+68
-36
lines changed

10 files changed

+68
-36
lines changed

e2e_test/source/cdc/cdc.load.slt

Lines changed: 0 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -57,7 +57,6 @@ create table shipments (
5757
username = 'postgres',
5858
password = 'postgres',
5959
database.name = 'cdc_test',
60-
schema.name = 'public',
6160
table.name = 'shipments',
6261
slot.name = 'shipments'
6362
);

e2e_test/source/cdc/cdc.validate.postgres.slt

Lines changed: 0 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -16,7 +16,6 @@ create table shipments (
1616
username = 'posres',
1717
password = 'postgres',
1818
database.name = 'cdc_test',
19-
schema.name = 'public',
2019
table.name = 'shipments',
2120
slot.name = 'shipments'
2221
);
@@ -38,7 +37,6 @@ create table shipments (
3837
username = 'postgres',
3938
password = 'otgres',
4039
database.name = 'cdc_test',
41-
schema.name = 'public',
4240
table.name = 'shipments',
4341
slot.name = 'shipments'
4442
);
@@ -59,7 +57,6 @@ create table shipments (
5957
username = 'postgres',
6058
password = 'postgres',
6159
database.name = 'cdc_test',
62-
schema.name = 'public',
6360
table.name = 'shipment',
6461
slot.name = 'shipments'
6562
);

java/connector-node/connector-api/src/main/java/com/risingwave/connector/api/source/ConnectorConfig.java

Lines changed: 16 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -17,7 +17,22 @@
1717
import java.util.HashMap;
1818
import java.util.Map;
1919

20-
public class ConnectorConfig {
20+
public class ConnectorConfig extends HashMap<String, String> {
21+
22+
public ConnectorConfig() {}
23+
24+
public ConnectorConfig(Map<? extends String, ? extends String> m) {
25+
super(m);
26+
}
27+
28+
public String getNonNull(String key) {
29+
String value = super.get(key);
30+
if (value == null) {
31+
throw new RuntimeException(key + "cannot be null");
32+
}
33+
return value;
34+
}
35+
2136
/* Common configs */
2237
public static final String HOST = "hostname";
2338
public static final String PORT = "port";

java/connector-node/risingwave-connector-service/src/main/java/com/risingwave/sourcenode/SourceRequestHandler.java

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -51,7 +51,7 @@ public void handle(ConnectorServiceProto.GetEventStreamRequest request) {
5151
SourceTypeE.valueOf(startRequest.getSourceType()),
5252
startRequest.getSourceId(),
5353
startRequest.getStartOffset(),
54-
startRequest.getPropertiesMap());
54+
new ConnectorConfig(startRequest.getPropertiesMap()));
5555
if (handler == null) {
5656
LOG.error("failed to create source handler");
5757
responseObserver.onCompleted();

java/connector-node/risingwave-connector-service/src/main/java/com/risingwave/sourcenode/core/SourceHandlerFactory.java

Lines changed: 2 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -14,19 +14,19 @@
1414

1515
package com.risingwave.sourcenode.core;
1616

17+
import com.risingwave.connector.api.source.ConnectorConfig;
1718
import com.risingwave.connector.api.source.SourceHandler;
1819
import com.risingwave.connector.api.source.SourceTypeE;
1920
import com.risingwave.sourcenode.mysql.MySqlSourceConfig;
2021
import com.risingwave.sourcenode.postgres.PostgresSourceConfig;
21-
import java.util.Map;
2222
import org.slf4j.Logger;
2323
import org.slf4j.LoggerFactory;
2424

2525
public abstract class SourceHandlerFactory {
2626
static final Logger LOG = LoggerFactory.getLogger(SourceHandlerFactory.class);
2727

2828
public static SourceHandler createSourceHandler(
29-
SourceTypeE type, long sourceId, String startOffset, Map<String, String> userProps) {
29+
SourceTypeE type, long sourceId, String startOffset, ConnectorConfig userProps) {
3030
switch (type) {
3131
case MYSQL:
3232
return DefaultSourceHandler.newWithConfig(

java/connector-node/risingwave-connector-service/src/main/java/com/risingwave/sourcenode/mysql/MySqlSourceConfig.java

Lines changed: 3 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -19,7 +19,6 @@
1919
import com.risingwave.connector.api.source.SourceTypeE;
2020
import com.risingwave.connector.cdc.debezium.internal.ConfigurableOffsetBackingStore;
2121
import com.risingwave.sourcenode.common.DebeziumCdcUtils;
22-
import java.util.Map;
2322
import java.util.Properties;
2423

2524
/** MySQL Source Config */
@@ -29,7 +28,7 @@ public class MySqlSourceConfig implements SourceConfig {
2928
private final long id;
3029
private final String sourceName;
3130

32-
public MySqlSourceConfig(long sourceId, String startOffset, Map<String, String> userProps) {
31+
public MySqlSourceConfig(long sourceId, String startOffset, ConnectorConfig userProps) {
3332
id = sourceId;
3433
props.setProperty("connector.class", "io.debezium.connector.mysql.MySqlConnector");
3534
props.setProperty(
@@ -56,9 +55,9 @@ public MySqlSourceConfig(long sourceId, String startOffset, Map<String, String>
5655
props.setProperty("database.include.list", userProps.get(ConnectorConfig.DB_NAME));
5756
// only captures data of the specified table
5857
String tableFilter =
59-
userProps.get(ConnectorConfig.DB_NAME)
58+
userProps.getNonNull(ConnectorConfig.DB_NAME)
6059
+ "."
61-
+ userProps.get(ConnectorConfig.TABLE_NAME);
60+
+ userProps.getNonNull(ConnectorConfig.TABLE_NAME);
6261
props.setProperty("table.include.list", tableFilter);
6362

6463
// disable schema change events for current stage

java/connector-node/risingwave-connector-service/src/main/java/com/risingwave/sourcenode/postgres/PostgresSourceConfig.java

Lines changed: 12 additions & 12 deletions
Original file line numberDiff line numberDiff line change
@@ -21,7 +21,6 @@
2121
import com.risingwave.sourcenode.common.DebeziumCdcUtils;
2222
import io.debezium.heartbeat.Heartbeat;
2323
import java.time.Duration;
24-
import java.util.Map;
2524
import java.util.Properties;
2625

2726
/** Postgres Source Config */
@@ -32,7 +31,7 @@ public class PostgresSourceConfig implements SourceConfig {
3231
private final String sourceName;
3332
private static final long DEFAULT_HEARTBEAT_MS = Duration.ofMinutes(5).toMillis();
3433

35-
public PostgresSourceConfig(long sourceId, String startOffset, Map<String, String> userProps) {
34+
public PostgresSourceConfig(long sourceId, String startOffset, ConnectorConfig userProps) {
3635
id = sourceId;
3736
props.setProperty("connector.class", "io.debezium.connector.postgresql.PostgresConnector");
3837
props.setProperty(
@@ -46,12 +45,16 @@ public PostgresSourceConfig(long sourceId, String startOffset, Map<String, Strin
4645
props.setProperty(ConfigurableOffsetBackingStore.OFFSET_STATE_VALUE, startOffset);
4746
}
4847

48+
String dbName = userProps.getNonNull(ConnectorConfig.DB_NAME);
49+
String schema = userProps.getNonNull(ConnectorConfig.PG_SCHEMA_NAME);
50+
String table = userProps.getNonNull(ConnectorConfig.TABLE_NAME);
51+
4952
// Begin of connector configs
5053
props.setProperty("database.hostname", userProps.get(ConnectorConfig.HOST));
5154
props.setProperty("database.port", userProps.get(ConnectorConfig.PORT));
5255
props.setProperty("database.user", userProps.get(ConnectorConfig.USER));
5356
props.setProperty("database.password", userProps.get(ConnectorConfig.PASSWORD));
54-
props.setProperty("database.dbname", userProps.get(ConnectorConfig.DB_NAME));
57+
props.setProperty("database.dbname", dbName);
5558
// The name of the PostgreSQL logical decoding plug-in installed on the PostgreSQL server.
5659
// Supported values are decoderbufs, and pgoutput.
5760
// The wal2json plug-in is deprecated and scheduled for removal.
@@ -77,24 +80,21 @@ public PostgresSourceConfig(long sourceId, String startOffset, Map<String, Strin
7780
Heartbeat.HEARTBEAT_TOPICS_PREFIX.name(),
7881
Heartbeat.HEARTBEAT_TOPICS_PREFIX.defaultValueAsString());
7982

80-
String tableFilter =
81-
userProps.get(ConnectorConfig.PG_SCHEMA_NAME)
82-
+ "."
83-
+ userProps.get(ConnectorConfig.TABLE_NAME);
83+
String tableFilter = schema + "." + table;
8484
props.setProperty("table.include.list", tableFilter);
8585
props.setProperty("database.server.name", DB_SERVER_NAME_PREFIX + tableFilter);
8686

8787
// host:port:database.schema.table
8888
sourceName =
89-
userProps.get(ConnectorConfig.HOST)
89+
userProps.getNonNull(ConnectorConfig.HOST)
9090
+ ":"
91-
+ userProps.get(ConnectorConfig.PORT)
91+
+ userProps.getNonNull(ConnectorConfig.PORT)
9292
+ ":"
93-
+ userProps.get(ConnectorConfig.DB_NAME)
93+
+ dbName
9494
+ "."
95-
+ userProps.get(ConnectorConfig.PG_SCHEMA_NAME)
95+
+ schema
9696
+ "."
97-
+ userProps.get(ConnectorConfig.TABLE_NAME);
97+
+ table;
9898
props.setProperty("name", sourceName);
9999

100100
// pass through debezium properties if any

src/frontend/src/handler/create_source.rs

Lines changed: 18 additions & 9 deletions
Original file line numberDiff line numberDiff line change
@@ -185,7 +185,7 @@ pub(crate) fn is_kafka_source(with_properties: &HashMap<String, String>) -> bool
185185
pub(crate) async fn resolve_source_schema(
186186
source_schema: SourceSchema,
187187
columns: &mut Vec<ColumnCatalog>,
188-
with_properties: &HashMap<String, String>,
188+
with_properties: &mut HashMap<String, String>,
189189
row_id_index: &mut Option<usize>,
190190
pk_column_ids: &mut Vec<ColumnId>,
191191
is_materialized: bool,
@@ -213,11 +213,7 @@ pub(crate) async fn resolve_source_schema(
213213

214214
columns_extend(
215215
columns,
216-
extract_protobuf_table_schema(
217-
protobuf_schema,
218-
with_properties.clone().into_iter().collect(),
219-
)
220-
.await?,
216+
extract_protobuf_table_schema(protobuf_schema, with_properties.clone()).await?,
221217
);
222218

223219
StreamSourceInfo {
@@ -525,7 +521,7 @@ fn source_shema_to_row_format(source_schema: &SourceSchema) -> RowFormatType {
525521

526522
fn validate_compatibility(
527523
source_schema: &SourceSchema,
528-
props: &HashMap<String, String>,
524+
props: &mut HashMap<String, String>,
529525
) -> Result<()> {
530526
let connector = get_connector(props);
531527
let row_format = source_shema_to_row_format(source_schema);
@@ -561,6 +557,19 @@ fn validate_compatibility(
561557
connector, row_format
562558
))));
563559
}
560+
561+
if connector == POSTGRES_CDC_CONNECTOR {
562+
if !props.contains_key("slot.name") {
563+
// Build a random slot name with UUID
564+
// e.g. "rw_cdc_f9a3567e6dd54bf5900444c8b1c03815"
565+
let uuid = uuid::Uuid::new_v4().to_string().replace('-', "");
566+
props.insert("slot.name".into(), format!("rw_cdc_{}", uuid));
567+
}
568+
if !props.contains_key("schema.name") {
569+
// Default schema name is "public"
570+
props.insert("schema.name".into(), "public".into());
571+
}
572+
}
564573
Ok(())
565574
}
566575

@@ -576,7 +585,7 @@ pub async fn handle_create_source(
576585
let (schema_name, name) = Binder::resolve_schema_qualified_name(db_name, stmt.source_name)?;
577586
let (database_id, schema_id) = session.get_database_and_schema_id_for_create(schema_name)?;
578587

579-
let with_properties = handler_args
588+
let mut with_properties = handler_args
580589
.with_options
581590
.inner()
582591
.clone()
@@ -606,7 +615,7 @@ pub async fn handle_create_source(
606615
let source_info = resolve_source_schema(
607616
stmt.source_schema,
608617
&mut columns,
609-
&with_properties,
618+
&mut with_properties,
610619
&mut row_id_index,
611620
&mut pk_column_ids,
612621
false,

src/frontend/src/handler/create_table.rs

Lines changed: 9 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -292,7 +292,7 @@ pub(crate) async fn gen_create_table_plan_with_source(
292292
append_only: bool,
293293
) -> Result<(PlanRef, Option<ProstSource>, ProstTable)> {
294294
let (column_descs, pk_column_id_from_columns) = bind_sql_columns(columns, &mut col_id_gen)?;
295-
let properties = context.with_options().inner().clone().into_iter().collect();
295+
let mut properties = context.with_options().inner().clone().into_iter().collect();
296296

297297
let (mut columns, mut pk_column_ids, mut row_id_index) =
298298
bind_sql_table_constraints(column_descs, pk_column_id_from_columns, constraints)?;
@@ -311,7 +311,7 @@ pub(crate) async fn gen_create_table_plan_with_source(
311311
let source_info = resolve_source_schema(
312312
source_schema,
313313
&mut columns,
314-
&properties,
314+
&mut properties,
315315
&mut row_id_index,
316316
&mut pk_column_ids,
317317
true,
@@ -322,6 +322,7 @@ pub(crate) async fn gen_create_table_plan_with_source(
322322
context.into(),
323323
table_name,
324324
columns,
325+
properties,
325326
pk_column_ids,
326327
row_id_index,
327328
Some(source_info),
@@ -346,12 +347,14 @@ pub(crate) fn gen_create_table_plan(
346347
let definition = context.normalized_sql().to_owned();
347348
let (column_descs, pk_column_id_from_columns) = bind_sql_columns(columns, &mut col_id_gen)?;
348349

350+
let properties = context.with_options().inner().clone().into_iter().collect();
349351
gen_create_table_plan_without_bind(
350352
context,
351353
table_name,
352354
column_descs,
353355
pk_column_id_from_columns,
354356
constraints,
357+
properties,
355358
definition,
356359
source_watermarks,
357360
append_only,
@@ -366,6 +369,7 @@ pub(crate) fn gen_create_table_plan_without_bind(
366369
column_descs: Vec<ColumnDesc>,
367370
pk_column_id_from_columns: Option<ColumnId>,
368371
constraints: Vec<TableConstraint>,
372+
properties: HashMap<String, String>,
369373
definition: String,
370374
source_watermarks: Vec<SourceWatermark>,
371375
append_only: bool,
@@ -385,6 +389,7 @@ pub(crate) fn gen_create_table_plan_without_bind(
385389
context.into(),
386390
table_name,
387391
columns,
392+
properties,
388393
pk_column_ids,
389394
row_id_index,
390395
None,
@@ -400,6 +405,7 @@ fn gen_table_plan_inner(
400405
context: OptimizerContextRef,
401406
table_name: ObjectName,
402407
columns: Vec<ColumnCatalog>,
408+
properties: HashMap<String, String>,
403409
pk_column_ids: Vec<ColumnId>,
404410
row_id_index: Option<usize>,
405411
source_info: Option<StreamSourceInfo>,
@@ -425,7 +431,7 @@ fn gen_table_plan_inner(
425431
.map(|column| column.to_protobuf())
426432
.collect_vec(),
427433
pk_column_ids: pk_column_ids.iter().map(Into::into).collect_vec(),
428-
properties: context.with_options().inner().clone().into_iter().collect(),
434+
properties,
429435
info: Some(source_info),
430436
owner: session.user_id(),
431437
watermark_descs: watermark_descs.clone(),

src/frontend/src/handler/create_table_as.rs

Lines changed: 7 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -87,12 +87,19 @@ pub async fn handle_create_as(
8787

8888
let (graph, source, table) = {
8989
let context = OptimizerContext::from_handler_args(handler_args.clone());
90+
let properties = handler_args
91+
.with_options
92+
.inner()
93+
.clone()
94+
.into_iter()
95+
.collect();
9096
let (plan, source, table) = gen_create_table_plan_without_bind(
9197
context,
9298
table_name.clone(),
9399
column_descs,
94100
None,
95101
vec![],
102+
properties,
96103
"".to_owned(), // TODO: support `SHOW CREATE TABLE` for `CREATE TABLE AS`
97104
vec![], // No watermark should be defined in for `CREATE TABLE AS`
98105
append_only,

0 commit comments

Comments
 (0)