Skip to content

Commit 0239e22

Browse files
authored
fix(cdc): only allow ADD and DROP in auto schema change (#18245)
1 parent 54134df commit 0239e22

File tree

5 files changed

+114
-6
lines changed

5 files changed

+114
-6
lines changed

e2e_test/source/cdc_inline/auto_schema_change_mysql.slt

Lines changed: 52 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -42,6 +42,7 @@ distribution key id NULL NULL
4242
table description rw_customers NULL NULL
4343

4444

45+
# add column
4546
system ok
4647
mysql -e "
4748
USE mytest;
@@ -64,6 +65,57 @@ primary key id NULL NULL
6465
distribution key id NULL NULL
6566
table description rw_customers NULL NULL
6667

68+
# rename column on upstream will not be replicated, since we do not support rename column
69+
system ok
70+
mysql -e "
71+
USE mytest;
72+
ALTER TABLE customers RENAME COLUMN v1 TO v11;
73+
ALTER TABLE customers CHANGE COLUMN v2 v22 decimal(5,2);
74+
"
75+
76+
sleep 3s
77+
78+
# table schema unchanges, since we reject rename column
79+
query TTTT
80+
describe rw_customers;
81+
----
82+
id bigint false NULL
83+
modified timestamp without time zone false NULL
84+
custinfo jsonb false NULL
85+
v1 character varying false NULL
86+
v2 double precision false NULL
87+
primary key id NULL NULL
88+
distribution key id NULL NULL
89+
table description rw_customers NULL NULL
90+
91+
# revert column rename on upstream
92+
system ok
93+
mysql -e "
94+
USE mytest;
95+
ALTER TABLE customers RENAME COLUMN v11 TO v1;
96+
ALTER TABLE customers CHANGE COLUMN v22 v2 double(5,2);
97+
"
98+
99+
# drop columns
100+
system ok
101+
mysql -e "
102+
USE mytest;
103+
ALTER TABLE customers DROP COLUMN modified;
104+
ALTER TABLE customers DROP COLUMN v1;
105+
ALTER TABLE customers DROP COLUMN v2;
106+
"
107+
108+
sleep 3s
109+
110+
# modified column should be dropped
111+
query TTTT
112+
describe rw_customers;
113+
----
114+
id bigint false NULL
115+
custinfo jsonb false NULL
116+
primary key id NULL NULL
117+
distribution key id NULL NULL
118+
table description rw_customers NULL NULL
67119

68120
statement ok
69121
drop source mysql_source cascade;

src/connector/src/parser/unified/debezium.rs

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -165,7 +165,7 @@ pub fn parse_schema_change(
165165
) -> AccessResult<SchemaChangeEnvelope> {
166166
let mut schema_changes = vec![];
167167

168-
let upstream_ddl = accessor
168+
let upstream_ddl: String = accessor
169169
.access(&[UPSTREAM_DDL], &DataType::Varchar)?
170170
.to_owned_datum()
171171
.unwrap()

src/frontend/src/handler/alter_table_column.rs

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -69,6 +69,7 @@ pub async fn replace_table_with_definition(
6969
Ok(())
7070
}
7171

72+
/// Used in auto schema change process
7273
pub async fn get_new_table_definition_for_cdc_table(
7374
session: &Arc<SessionImpl>,
7475
table_name: ObjectName,

src/frontend/src/handler/create_table.rs

Lines changed: 18 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -1015,6 +1015,7 @@ pub(super) async fn handle_create_table_plan(
10151015
&constraints,
10161016
connect_properties.clone(),
10171017
wildcard_idx.is_some(),
1018+
None,
10181019
)
10191020
.await?;
10201021

@@ -1123,6 +1124,8 @@ async fn derive_schema_for_cdc_table(
11231124
constraints: &Vec<TableConstraint>,
11241125
connect_properties: WithOptionsSecResolved,
11251126
need_auto_schema_map: bool,
1127+
// original table catalog available in auto schema change process
1128+
original_catalog: Option<Arc<TableCatalog>>,
11261129
) -> Result<(Vec<ColumnCatalog>, Vec<String>)> {
11271130
// read cdc table schema from external db or parsing the schema from SQL definitions
11281131
if need_auto_schema_map {
@@ -1154,10 +1157,20 @@ async fn derive_schema_for_cdc_table(
11541157
table.pk_names().clone(),
11551158
))
11561159
} else {
1157-
Ok((
1158-
bind_sql_columns(column_defs)?,
1159-
bind_sql_pk_names(column_defs, constraints)?,
1160-
))
1160+
let columns = bind_sql_columns(column_defs)?;
1161+
// For table created by `create table t (*)` the constraint is empty, we need to
1162+
// retrieve primary key names from original table catalog if available
1163+
let pk_names = if let Some(original_catalog) = original_catalog {
1164+
original_catalog
1165+
.pk
1166+
.iter()
1167+
.map(|x| original_catalog.columns[x.column_index].name().to_string())
1168+
.collect()
1169+
} else {
1170+
bind_sql_pk_names(column_defs, constraints)?
1171+
};
1172+
1173+
Ok((columns, pk_names))
11611174
}
11621175
}
11631176

@@ -1328,6 +1341,7 @@ pub async fn generate_stream_graph_for_table(
13281341
&constraints,
13291342
connect_properties.clone(),
13301343
false,
1344+
Some(original_catalog.clone()),
13311345
)
13321346
.await?;
13331347

src/meta/service/src/ddl_service.rs

Lines changed: 42 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -12,12 +12,13 @@
1212
// See the License for the specific language governing permissions and
1313
// limitations under the License.
1414

15-
use std::collections::HashMap;
15+
use std::collections::{HashMap, HashSet};
1616
use std::sync::Arc;
1717

1818
use anyhow::anyhow;
1919
use rand::seq::SliceRandom;
2020
use rand::thread_rng;
21+
use risingwave_common::catalog::ColumnCatalog;
2122
use risingwave_common::util::column_index_mapping::ColIndexMapping;
2223
use risingwave_connector::sink::catalog::SinkId;
2324
use risingwave_meta::manager::{EventLogManagerRef, MetadataManager};
@@ -966,6 +967,46 @@ impl DdlService for DdlServiceImpl {
966967
.await?;
967968

968969
for table in tables {
970+
// Since we only support `ADD` and `DROP` column, we check whether the new columns and the original columns
971+
// is a subset of the other.
972+
let original_column_names: HashSet<String> = HashSet::from_iter(
973+
table
974+
.columns
975+
.iter()
976+
.map(|col| ColumnCatalog::from(col.clone()).column_desc.name),
977+
);
978+
let new_column_names: HashSet<String> = HashSet::from_iter(
979+
table_change
980+
.columns
981+
.iter()
982+
.map(|col| ColumnCatalog::from(col.clone()).column_desc.name),
983+
);
984+
if !(original_column_names.is_subset(&new_column_names)
985+
|| original_column_names.is_superset(&new_column_names))
986+
{
987+
tracing::warn!(target: "auto_schema_change",
988+
table_id = table.id,
989+
cdc_table_id = table.cdc_table_id,
990+
upstraem_ddl = table_change.upstream_ddl,
991+
original_columns = ?original_column_names,
992+
new_columns = ?new_column_names,
993+
"New columns should be a subset or superset of the original columns, since only `ADD COLUMN` and `DROP COLUMN` is supported");
994+
return Err(Status::invalid_argument(
995+
"New columns should be a subset or superset of the original columns",
996+
));
997+
}
998+
// skip the schema change if there is no change to original columns
999+
if original_column_names == new_column_names {
1000+
tracing::warn!(target: "auto_schema_change",
1001+
table_id = table.id,
1002+
cdc_table_id = table.cdc_table_id,
1003+
upstraem_ddl = table_change.upstream_ddl,
1004+
original_columns = ?original_column_names,
1005+
new_columns = ?new_column_names,
1006+
"No change to columns, skipping the schema change");
1007+
continue;
1008+
}
1009+
9691010
let latency_timer = self
9701011
.meta_metrics
9711012
.auto_schema_change_latency

0 commit comments

Comments
 (0)