Skip to content

Commit b11b3db

Browse files
StrikeWStrikeW
authored and
StrikeW
committed
feat(cdc): support constant default value for alter table ADD COLUMN (#18322)
1 parent 92e45bb commit b11b3db

File tree

10 files changed

+193
-67
lines changed

10 files changed

+193
-67
lines changed

e2e_test/source/cdc_inline/alter/cdc_table_alter.slt

Lines changed: 6 additions & 6 deletions
Original file line numberDiff line numberDiff line change
@@ -131,13 +131,13 @@ select order_id, product_id, shipment_id from enriched_orders order by order_id;
131131
system ok
132132
mysql -e "
133133
USE testdb1;
134-
ALTER TABLE products ADD COLUMN weight DECIMAL(10, 2) NOT NULL DEFAULT 0.0;
134+
ALTER TABLE products ADD COLUMN weight DECIMAL(10, 2) NOT NULL DEFAULT 1.1;
135135
ALTER TABLE orders ADD COLUMN order_comment VARCHAR(255);
136136
"
137137

138138
# alter cdc tables
139139
statement ok
140-
ALTER TABLE my_products ADD COLUMN weight DECIMAL;
140+
ALTER TABLE my_products ADD COLUMN weight DECIMAL DEFAULT 1.1;
141141

142142
statement ok
143143
ALTER TABLE my_orders ADD COLUMN order_comment VARCHAR;
@@ -148,9 +148,9 @@ sleep 3s
148148
query ITTT
149149
SELECT id,name,description,weight FROM my_products order by id limit 3
150150
----
151-
101 scooter Small 2-wheel scooter NULL
152-
102 car battery 12V car battery NULL
153-
103 12-pack drill 12-pack of drill bits with sizes ranging from #40 to #3 NULL
151+
101 scooter Small 2-wheel scooter 1.1
152+
102 car battery 12V car battery 1.1
153+
103 12-pack drill 12-pack of drill bits with sizes ranging from #40 to #3 1.1
154154

155155

156156
# update mysql tables
@@ -169,7 +169,7 @@ SELECT id,name,description,weight FROM my_products order by id limit 3
169169
----
170170
101 scooter Small 2-wheel scooter 10.50
171171
102 car battery 12V car battery 12.50
172-
103 12-pack drill 12-pack of drill bits with sizes ranging from #40 to #3 NULL
172+
103 12-pack drill 12-pack of drill bits with sizes ranging from #40 to #3 1.1
173173

174174
query ITTT
175175
SELECT order_id,order_date,customer_name,product_id,order_status,order_comment FROM my_orders order by order_id limit 2

e2e_test/source/cdc_inline/auto_schema_change_mysql.slt

Lines changed: 16 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -10,8 +10,11 @@ mysql -e "
1010
CREATE TABLE customers(
1111
id BIGINT PRIMARY KEY,
1212
modified DATETIME,
13+
name VARCHAR(32),
1314
custinfo JSON
1415
);
16+
INSERT INTO customers VALUES(1, NOW(), 'John', NULL);
17+
INSERT INTO customers VALUES(2, NOW(), 'Doe', NULL);
1518
ALTER TABLE customers ADD INDEX zipsa( (CAST(custinfo->'zipcode' AS UNSIGNED ARRAY)) );
1619
"
1720

@@ -28,14 +31,15 @@ create source mysql_source with (
2831
);
2932

3033
statement ok
31-
create table rw_customers (id bigint, modified timestamp, custinfo jsonb, primary key (id)) from mysql_source table 'mytest.customers';
34+
create table rw_customers (id bigint, modified timestamp, name varchar, custinfo jsonb, primary key (id)) from mysql_source table 'mytest.customers';
3235

3336
# Name, Type, Is Hidden, Description
3437
query TTTT
3538
describe rw_customers;
3639
----
3740
id bigint false NULL
3841
modified timestamp without time zone false NULL
42+
name character varying false NULL
3943
custinfo jsonb false NULL
4044
primary key id NULL NULL
4145
distribution key id NULL NULL
@@ -46,8 +50,8 @@ table description rw_customers NULL NULL
4650
system ok
4751
mysql -e "
4852
USE mytest;
49-
ALTER TABLE customers ADD COLUMN v1 VARCHAR(255);
50-
ALTER TABLE customers ADD COLUMN v2 double(5,2);
53+
ALTER TABLE customers ADD COLUMN v1 VARCHAR(255) DEFAULT 'hello';
54+
ALTER TABLE customers ADD COLUMN v2 double(5,2) DEFAULT 88.9;
5155
"
5256

5357
sleep 3s
@@ -58,13 +62,20 @@ describe rw_customers;
5862
----
5963
id bigint false NULL
6064
modified timestamp without time zone false NULL
65+
name character varying false NULL
6166
custinfo jsonb false NULL
6267
v1 character varying false NULL
6368
v2 double precision false NULL
6469
primary key id NULL NULL
6570
distribution key id NULL NULL
6671
table description rw_customers NULL NULL
6772

73+
query TTTT
74+
select id,v1,v2,name from rw_customers order by id;
75+
----
76+
1 hello 88.9 John
77+
2 hello 88.9 Doe
78+
6879
# rename column on upstream will not be replicated, since we do not support rename column
6980
system ok
7081
mysql -e "
@@ -81,6 +92,7 @@ describe rw_customers;
8192
----
8293
id bigint false NULL
8394
modified timestamp without time zone false NULL
95+
name character varying false NULL
8496
custinfo jsonb false NULL
8597
v1 character varying false NULL
8698
v2 double precision false NULL
@@ -112,6 +124,7 @@ query TTTT
112124
describe rw_customers;
113125
----
114126
id bigint false NULL
127+
name character varying false NULL
115128
custinfo jsonb false NULL
116129
primary key id NULL NULL
117130
distribution key id NULL NULL

src/common/src/catalog/column.rs

Lines changed: 13 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -18,7 +18,7 @@ use itertools::Itertools;
1818
use risingwave_pb::expr::ExprNode;
1919
use risingwave_pb::plan_common::column_desc::GeneratedOrDefaultColumn;
2020
use risingwave_pb::plan_common::{
21-
AdditionalColumn, ColumnDescVersion, PbColumnCatalog, PbColumnDesc,
21+
AdditionalColumn, ColumnDescVersion, DefaultColumnDesc, PbColumnCatalog, PbColumnDesc,
2222
};
2323

2424
use super::{row_id_column_desc, USER_COLUMN_ID_OFFSET};
@@ -140,6 +140,18 @@ impl ColumnDesc {
140140
}
141141
}
142142

143+
pub fn named_with_default_value(
144+
name: impl Into<String>,
145+
column_id: ColumnId,
146+
data_type: DataType,
147+
default_val: DefaultColumnDesc,
148+
) -> ColumnDesc {
149+
ColumnDesc {
150+
generated_or_default_column: Some(GeneratedOrDefaultColumn::DefaultColumn(default_val)),
151+
..Self::named(name, column_id, data_type)
152+
}
153+
}
154+
143155
pub fn named_with_additional_column(
144156
name: impl Into<String>,
145157
column_id: ColumnId,

src/connector/src/parser/unified/debezium.rs

Lines changed: 38 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -18,8 +18,12 @@ use risingwave_common::types::{
1818
DataType, Datum, DatumCow, Scalar, ScalarImpl, ScalarRefImpl, Timestamptz, ToDatumRef,
1919
ToOwnedDatum,
2020
};
21+
use risingwave_common::util::value_encoding::DatumToProtoExt;
2122
use risingwave_connector_codec::decoder::AccessExt;
23+
use risingwave_pb::expr::expr_node::{RexNode, Type as ExprType};
24+
use risingwave_pb::expr::ExprNode;
2225
use risingwave_pb::plan_common::additional_column::ColumnType;
26+
use risingwave_pb::plan_common::DefaultColumnDesc;
2327
use thiserror_ext::AsReport;
2428

2529
use super::{Access, AccessError, AccessResult, ChangeEvent, ChangeEventOperation};
@@ -221,7 +225,40 @@ pub fn parse_schema_change(
221225
}
222226
};
223227

224-
column_descs.push(ColumnDesc::named(name, ColumnId::placeholder(), data_type));
228+
// handle default value expression, currently we only support constant expression
229+
let column_desc = match col.access_object_field("defaultValueExpression") {
230+
Some(default_val_expr_str) if !default_val_expr_str.is_jsonb_null() => {
231+
let value_text = default_val_expr_str.as_string().unwrap();
232+
let snapshot_value: Datum = Some(
233+
ScalarImpl::from_text(value_text.as_str(), &data_type).map_err(
234+
|err| {
235+
tracing::error!(target: "auto_schema_change", error=%err.as_report(), "failed to parse default value expression");
236+
AccessError::TypeError {
237+
expected: "constant expression".into(),
238+
got: data_type.to_string(),
239+
value: value_text,
240+
}},
241+
)?,
242+
);
243+
// equivalent to `Literal::to_expr_proto`
244+
let default_val_expr_node = ExprNode {
245+
function_type: ExprType::Unspecified as i32,
246+
return_type: Some(data_type.to_protobuf()),
247+
rex_node: Some(RexNode::Constant(snapshot_value.to_protobuf())),
248+
};
249+
ColumnDesc::named_with_default_value(
250+
name,
251+
ColumnId::placeholder(),
252+
data_type,
253+
DefaultColumnDesc {
254+
expr: Some(default_val_expr_node),
255+
snapshot_value: Some(snapshot_value.to_protobuf()),
256+
},
257+
)
258+
}
259+
_ => ColumnDesc::named(name, ColumnId::placeholder(), data_type),
260+
};
261+
column_descs.push(column_desc);
225262
}
226263
}
227264

src/connector/src/source/manager.rs

Lines changed: 9 additions & 5 deletions
Original file line numberDiff line numberDiff line change
@@ -19,6 +19,7 @@ use risingwave_common::catalog::{
1919
TABLE_NAME_COLUMN_NAME,
2020
};
2121
use risingwave_common::types::DataType;
22+
use risingwave_pb::plan_common::column_desc::GeneratedOrDefaultColumn;
2223
use risingwave_pb::plan_common::{AdditionalColumn, ColumnDescVersion};
2324

2425
/// `SourceColumnDesc` is used to describe a column in the Source.
@@ -137,11 +138,14 @@ impl From<&ColumnDesc> for SourceColumnDesc {
137138
version: _,
138139
}: &ColumnDesc,
139140
) -> Self {
140-
debug_assert!(
141-
generated_or_default_column.is_none(),
142-
"source column should not be generated or default: {:?}",
143-
generated_or_default_column.as_ref().unwrap()
144-
);
141+
if let Some(option) = generated_or_default_column {
142+
debug_assert!(
143+
matches!(option, GeneratedOrDefaultColumn::DefaultColumn(_)),
144+
"source column should not be generated: {:?}",
145+
generated_or_default_column.as_ref().unwrap()
146+
)
147+
}
148+
145149
Self {
146150
name: name.clone(),
147151
data_type: data_type.clone(),

src/frontend/src/handler/alter_table_column.rs

Lines changed: 15 additions & 10 deletions
Original file line numberDiff line numberDiff line change
@@ -58,6 +58,7 @@ pub async fn replace_table_with_definition(
5858
definition,
5959
original_catalog,
6060
source_schema,
61+
None,
6162
)
6263
.await?;
6364

@@ -73,7 +74,7 @@ pub async fn replace_table_with_definition(
7374
pub async fn get_new_table_definition_for_cdc_table(
7475
session: &Arc<SessionImpl>,
7576
table_name: ObjectName,
76-
new_columns: Vec<ColumnCatalog>,
77+
new_columns: &[ColumnCatalog],
7778
) -> Result<(Statement, Arc<TableCatalog>)> {
7879
let original_catalog = fetch_table_catalog_for_alter(session.as_ref(), &table_name)?;
7980

@@ -96,22 +97,24 @@ pub async fn get_new_table_definition_for_cdc_table(
9697
"source schema should be None for CDC table"
9798
);
9899

99-
let orig_column_map: HashMap<String, ColumnDef> = HashMap::from_iter(
100-
original_columns
100+
let orig_column_catalog: HashMap<String, ColumnCatalog> = HashMap::from_iter(
101+
original_catalog
102+
.columns()
101103
.iter()
102-
.map(|col| (col.name.real_value(), col.clone())),
104+
.map(|col| (col.name().to_string(), col.clone())),
103105
);
104106

105107
// update the original columns with new version columns
106108
let mut new_column_defs = vec![];
107-
for col in new_columns {
108-
// if the column exists in the original definitoins, use the original column definition.
109+
for new_col in new_columns {
110+
// if the column exists in the original catalog, use it to construct the column definition.
109111
// since we don't support altering the column type right now
110-
if let Some(original_col) = orig_column_map.get(col.name()) {
111-
new_column_defs.push(original_col.clone());
112+
if let Some(original_col) = orig_column_catalog.get(new_col.name()) {
113+
let ty = to_ast_data_type(original_col.data_type())?;
114+
new_column_defs.push(ColumnDef::new(original_col.name().into(), ty, None, vec![]));
112115
} else {
113-
let ty = to_ast_data_type(col.data_type())?;
114-
new_column_defs.push(ColumnDef::new(col.name().into(), ty, None, vec![]));
116+
let ty = to_ast_data_type(new_col.data_type())?;
117+
new_column_defs.push(ColumnDef::new(new_col.name().into(), ty, None, vec![]));
115118
}
116119
}
117120
*original_columns = new_column_defs;
@@ -162,6 +165,7 @@ pub async fn get_replace_table_plan(
162165
definition: Statement,
163166
original_catalog: &Arc<TableCatalog>,
164167
source_schema: Option<ConnectorSchema>,
168+
new_version_columns: Option<Vec<ColumnCatalog>>, // only provided in auto schema change
165169
) -> Result<(
166170
Option<Source>,
167171
Table,
@@ -202,6 +206,7 @@ pub async fn get_replace_table_plan(
202206
on_conflict,
203207
with_version_column,
204208
cdc_table_info,
209+
new_version_columns,
205210
)
206211
.await?;
207212

src/frontend/src/handler/create_sink.rs

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -693,6 +693,7 @@ pub(crate) async fn reparse_table_for_sink(
693693
on_conflict,
694694
with_version_column,
695695
None,
696+
None,
696697
)
697698
.await?;
698699

0 commit comments

Comments
 (0)