@@ -19,6 +19,7 @@ use anyhow::anyhow;
19
19
use rand:: seq:: SliceRandom ;
20
20
use rand:: thread_rng;
21
21
use risingwave_common:: catalog:: ColumnCatalog ;
22
+ use risingwave_common:: types:: DataType ;
22
23
use risingwave_common:: util:: column_index_mapping:: ColIndexMapping ;
23
24
use risingwave_connector:: sink:: catalog:: SinkId ;
24
25
use risingwave_meta:: manager:: { EventLogManagerRef , MetadataManager } ;
@@ -969,40 +970,41 @@ impl DdlService for DdlServiceImpl {
969
970
for table in tables {
970
971
// Since we only support `ADD` and `DROP` column, we check whether the new columns and the original columns
971
972
// is a subset of the other.
972
- let original_column_names: HashSet < String > = HashSet :: from_iter (
973
- table
974
- . columns
975
- . iter ( )
976
- . map ( |col| ColumnCatalog :: from ( col. clone ( ) ) . column_desc . name ) ,
977
- ) ;
978
- let new_column_names: HashSet < String > = HashSet :: from_iter (
979
- table_change
980
- . columns
981
- . iter ( )
982
- . map ( |col| ColumnCatalog :: from ( col. clone ( ) ) . column_desc . name ) ,
983
- ) ;
984
- if !( original_column_names. is_subset ( & new_column_names)
985
- || original_column_names. is_superset ( & new_column_names) )
973
+ let original_columns: HashSet < ( String , DataType ) > =
974
+ HashSet :: from_iter ( table. columns . iter ( ) . map ( |col| {
975
+ let col = ColumnCatalog :: from ( col. clone ( ) ) ;
976
+ let data_type = col. data_type ( ) . clone ( ) ;
977
+ ( col. column_desc . name , data_type)
978
+ } ) ) ;
979
+ let new_columns: HashSet < ( String , DataType ) > =
980
+ HashSet :: from_iter ( table_change. columns . iter ( ) . map ( |col| {
981
+ let col = ColumnCatalog :: from ( col. clone ( ) ) ;
982
+ let data_type = col. data_type ( ) . clone ( ) ;
983
+ ( col. column_desc . name , data_type)
984
+ } ) ) ;
985
+
986
+ if !( original_columns. is_subset ( & new_columns)
987
+ || original_columns. is_superset ( & new_columns) )
986
988
{
987
989
tracing:: warn!( target: "auto_schema_change" ,
988
990
table_id = table. id,
989
991
cdc_table_id = table. cdc_table_id,
990
992
upstraem_ddl = table_change. upstream_ddl,
991
- original_columns = ?original_column_names ,
992
- new_columns = ?new_column_names ,
993
+ original_columns = ?original_columns ,
994
+ new_columns = ?new_columns ,
993
995
"New columns should be a subset or superset of the original columns, since only `ADD COLUMN` and `DROP COLUMN` is supported" ) ;
994
996
return Err ( Status :: invalid_argument (
995
997
"New columns should be a subset or superset of the original columns" ,
996
998
) ) ;
997
999
}
998
1000
// skip the schema change if there is no change to original columns
999
- if original_column_names == new_column_names {
1001
+ if original_columns == new_columns {
1000
1002
tracing:: warn!( target: "auto_schema_change" ,
1001
1003
table_id = table. id,
1002
1004
cdc_table_id = table. cdc_table_id,
1003
1005
upstraem_ddl = table_change. upstream_ddl,
1004
- original_columns = ?original_column_names ,
1005
- new_columns = ?new_column_names ,
1006
+ original_columns = ?original_columns ,
1007
+ new_columns = ?new_columns ,
1006
1008
"No change to columns, skipping the schema change" ) ;
1007
1009
continue ;
1008
1010
}
0 commit comments