Skip to content

Commit fd2a899

Browse files
yezizp2012shanicky
andauthored
fix(sql-backend): fix some corner cases in alter rename and schema in sql backend (#14982)
Co-authored-by: Shanicky Chen <[email protected]>
1 parent 42cc7c8 commit fd2a899

File tree

8 files changed

+78
-44
lines changed

8 files changed

+78
-44
lines changed

e2e_test/batch/basic/logical_view.slt.part

+1-1
Original file line numberDiff line numberDiff line change
@@ -40,7 +40,7 @@ SELECT * FROM v3;
4040
statement error
4141
DROP TABLE t;
4242

43-
statement error other relation\(s\) depend on it
43+
statement error Permission denied
4444
DROP VIEW v2;
4545

4646
statement ok

e2e_test/ddl/alter_rename.slt

+1-1
Original file line numberDiff line numberDiff line change
@@ -229,7 +229,7 @@ DROP SCHEMA schema1;
229229
statement ok
230230
DROP SINK sink1;
231231

232-
statement error other relation\(s\) depend on it
232+
statement error Permission denied
233233
DROP VIEW v5;
234234

235235
statement ok

e2e_test/ddl/dependency_check.slt

+5-5
Original file line numberDiff line numberDiff line change
@@ -16,7 +16,7 @@ create index i_b1 on b(b1);
1616
statement ok
1717
create materialized view mv1 as select * from a join b on a.a1 = b.b1;
1818

19-
statement error other relation\(s\) depend on it
19+
statement error Permission denied
2020
drop index i_a1;
2121

2222
statement ok
@@ -25,7 +25,7 @@ drop materialized view mv1;
2525
statement ok
2626
create materialized view mv2 as with ctx as (select a1 from a) select b1 from b;
2727

28-
statement error other relation\(s\) depend on it
28+
statement error Permission denied
2929
drop table a;
3030

3131
statement ok
@@ -48,13 +48,13 @@ create view v2 as select * from v;
4848
statement ok
4949
create materialized view mv3 as select * from v2;
5050

51-
statement error other relation\(s\) depend on it
51+
statement error Permission denied
5252
drop source src;
5353

54-
statement error other relation\(s\) depend on it
54+
statement error Permission denied
5555
drop view v;
5656

57-
statement error other relation\(s\) depend on it
57+
statement error Permission denied
5858
drop view v2;
5959

6060
statement ok

e2e_test/source/basic/ddl.slt

+1-1
Original file line numberDiff line numberDiff line change
@@ -134,7 +134,7 @@ create source s (
134134
statement ok
135135
create materialized view mv_1 as select * from s
136136

137-
statement error other relation\(s\) depend on it
137+
statement error Permission denied
138138
drop source s
139139

140140
statement ok

e2e_test/source/basic/old_row_format_syntax/ddl.slt

+1-1
Original file line numberDiff line numberDiff line change
@@ -90,7 +90,7 @@ create source s (
9090
statement ok
9191
create materialized view mv_1 as select * from s
9292

93-
statement error other relation\(s\) depend on it
93+
statement error Permission denied
9494
drop source s
9595

9696
statement ok

src/meta/model_v2/src/table.rs

+1-1
Original file line numberDiff line numberDiff line change
@@ -24,7 +24,7 @@ use crate::{
2424
SourceId, TableId, TableVersion,
2525
};
2626

27-
#[derive(Clone, Debug, PartialEq, Eq, EnumIter, DeriveActiveEnum)]
27+
#[derive(Clone, Debug, PartialEq, Copy, Eq, EnumIter, DeriveActiveEnum)]
2828
#[sea_orm(rs_type = "String", db_type = "String(None)")]
2929
pub enum TableType {
3030
#[sea_orm(string_value = "TABLE")]

src/meta/src/controller/catalog.rs

+59-23
Original file line numberDiff line numberDiff line change
@@ -1131,10 +1131,6 @@ impl CatalogController {
11311131
if obj.schema_id == Some(new_schema) {
11321132
return Ok(IGNORED_NOTIFICATION_VERSION);
11331133
}
1134-
1135-
let mut obj = obj.into_active_model();
1136-
obj.schema_id = Set(Some(new_schema));
1137-
let obj = obj.update(&txn).await?;
11381134
let database_id = obj.database_id.unwrap();
11391135

11401136
let mut relations = vec![];
@@ -1145,9 +1141,16 @@ impl CatalogController {
11451141
.await?
11461142
.ok_or_else(|| MetaError::catalog_id_not_found("table", object_id))?;
11471143
check_relation_name_duplicate(&table.name, database_id, new_schema, &txn).await?;
1144+
let (associated_src_id, table_type) =
1145+
(table.optional_associated_source_id, table.table_type);
1146+
1147+
let mut obj = obj.into_active_model();
1148+
obj.schema_id = Set(Some(new_schema));
1149+
let obj = obj.update(&txn).await?;
1150+
relations.push(PbRelationInfo::Table(ObjectModel(table, obj).into()));
11481151

11491152
// associated source.
1150-
if let Some(associated_source_id) = table.optional_associated_source_id {
1153+
if let Some(associated_source_id) = associated_src_id {
11511154
let src_obj = object::ActiveModel {
11521155
oid: Set(associated_source_id as _),
11531156
schema_id: Set(Some(new_schema)),
@@ -1168,7 +1171,7 @@ impl CatalogController {
11681171
let (index_ids, (index_names, mut table_ids)): (
11691172
Vec<IndexId>,
11701173
(Vec<String>, Vec<TableId>),
1171-
) = if table.table_type == TableType::Table {
1174+
) = if table_type == TableType::Table {
11721175
Index::find()
11731176
.select_only()
11741177
.columns([
@@ -1186,7 +1189,6 @@ impl CatalogController {
11861189
} else {
11871190
(vec![], (vec![], vec![]))
11881191
};
1189-
relations.push(PbRelationInfo::Table(ObjectModel(table, obj).into()));
11901192

11911193
// internal tables.
11921194
let internal_tables: Vec<TableId> = Table::find()
@@ -1220,18 +1222,6 @@ impl CatalogController {
12201222
.await?;
12211223
}
12221224

1223-
if !index_ids.is_empty() {
1224-
let index_objs = Index::find()
1225-
.find_also_related(Object)
1226-
.filter(index::Column::IndexId.is_in(index_ids))
1227-
.all(&txn)
1228-
.await?;
1229-
for (index, index_obj) in index_objs {
1230-
relations.push(PbRelationInfo::Index(
1231-
ObjectModel(index, index_obj.unwrap()).into(),
1232-
));
1233-
}
1234-
}
12351225
if !table_ids.is_empty() {
12361226
let table_objs = Table::find()
12371227
.find_also_related(Object)
@@ -1244,13 +1234,29 @@ impl CatalogController {
12441234
));
12451235
}
12461236
}
1237+
if !index_ids.is_empty() {
1238+
let index_objs = Index::find()
1239+
.find_also_related(Object)
1240+
.filter(index::Column::IndexId.is_in(index_ids))
1241+
.all(&txn)
1242+
.await?;
1243+
for (index, index_obj) in index_objs {
1244+
relations.push(PbRelationInfo::Index(
1245+
ObjectModel(index, index_obj.unwrap()).into(),
1246+
));
1247+
}
1248+
}
12471249
}
12481250
ObjectType::Source => {
12491251
let source = Source::find_by_id(object_id)
12501252
.one(&txn)
12511253
.await?
12521254
.ok_or_else(|| MetaError::catalog_id_not_found("source", object_id))?;
12531255
check_relation_name_duplicate(&source.name, database_id, new_schema, &txn).await?;
1256+
1257+
let mut obj = obj.into_active_model();
1258+
obj.schema_id = Set(Some(new_schema));
1259+
let obj = obj.update(&txn).await?;
12541260
relations.push(PbRelationInfo::Source(ObjectModel(source, obj).into()));
12551261
}
12561262
ObjectType::Sink => {
@@ -1259,6 +1265,10 @@ impl CatalogController {
12591265
.await?
12601266
.ok_or_else(|| MetaError::catalog_id_not_found("sink", object_id))?;
12611267
check_relation_name_duplicate(&sink.name, database_id, new_schema, &txn).await?;
1268+
1269+
let mut obj = obj.into_active_model();
1270+
obj.schema_id = Set(Some(new_schema));
1271+
let obj = obj.update(&txn).await?;
12621272
relations.push(PbRelationInfo::Sink(ObjectModel(sink, obj).into()));
12631273

12641274
// internal tables.
@@ -1298,16 +1308,30 @@ impl CatalogController {
12981308
.await?
12991309
.ok_or_else(|| MetaError::catalog_id_not_found("view", object_id))?;
13001310
check_relation_name_duplicate(&view.name, database_id, new_schema, &txn).await?;
1311+
1312+
let mut obj = obj.into_active_model();
1313+
obj.schema_id = Set(Some(new_schema));
1314+
let obj = obj.update(&txn).await?;
13011315
relations.push(PbRelationInfo::View(ObjectModel(view, obj).into()));
13021316
}
13031317
ObjectType::Function => {
13041318
let function = Function::find_by_id(object_id)
13051319
.one(&txn)
13061320
.await?
13071321
.ok_or_else(|| MetaError::catalog_id_not_found("function", object_id))?;
1308-
let pb_function: PbFunction = ObjectModel(function, obj).into();
1322+
1323+
let mut pb_function: PbFunction = ObjectModel(function, obj).into();
1324+
pb_function.schema_id = new_schema as _;
13091325
check_function_signature_duplicate(&pb_function, &txn).await?;
13101326

1327+
object::ActiveModel {
1328+
oid: Set(object_id),
1329+
schema_id: Set(Some(new_schema)),
1330+
..Default::default()
1331+
}
1332+
.update(&txn)
1333+
.await?;
1334+
13111335
txn.commit().await?;
13121336
let version = self
13131337
.notify_frontend(
@@ -1322,9 +1346,19 @@ impl CatalogController {
13221346
.one(&txn)
13231347
.await?
13241348
.ok_or_else(|| MetaError::catalog_id_not_found("connection", object_id))?;
1325-
let pb_connection: PbConnection = ObjectModel(connection, obj).into();
1349+
1350+
let mut pb_connection: PbConnection = ObjectModel(connection, obj).into();
1351+
pb_connection.schema_id = new_schema as _;
13261352
check_connection_name_duplicate(&pb_connection, &txn).await?;
13271353

1354+
object::ActiveModel {
1355+
oid: Set(object_id),
1356+
schema_id: Set(Some(new_schema)),
1357+
..Default::default()
1358+
}
1359+
.update(&txn)
1360+
.await?;
1361+
13281362
txn.commit().await?;
13291363
let version = self
13301364
.notify_frontend(
@@ -1337,6 +1371,7 @@ impl CatalogController {
13371371
_ => unreachable!("not supported object type: {:?}", object_type),
13381372
}
13391373

1374+
txn.commit().await?;
13401375
let version = self
13411376
.notify_frontend(
13421377
Operation::Update,
@@ -1748,7 +1783,7 @@ impl CatalogController {
17481783
let obj = obj.unwrap();
17491784
let old_name = relation.name.clone();
17501785
relation.name = object_name.into();
1751-
if obj.obj_type != ObjectType::Index {
1786+
if obj.obj_type != ObjectType::View {
17521787
relation.definition = alter_relation_rename(&relation.definition, object_name);
17531788
}
17541789
let active_model = $table::ActiveModel {
@@ -1778,6 +1813,7 @@ impl CatalogController {
17781813
.unwrap();
17791814
index.name = object_name.into();
17801815
let index_table_id = index.index_table_id;
1816+
let old_name = rename_relation!(Table, table, table_id, index_table_id);
17811817

17821818
// the name of index and its associated table is the same.
17831819
let active_model = index::ActiveModel {
@@ -1791,7 +1827,7 @@ impl CatalogController {
17911827
ObjectModel(index, obj.unwrap()).into(),
17921828
)),
17931829
});
1794-
rename_relation!(Table, table, table_id, index_table_id)
1830+
old_name
17951831
}
17961832
_ => unreachable!("only relation name can be altered."),
17971833
};

src/meta/src/controller/streaming_job.rs

+9-11
Original file line numberDiff line numberDiff line change
@@ -38,7 +38,7 @@ use risingwave_pb::catalog::table::{PbOptionalAssociatedSourceId, PbTableVersion
3838
use risingwave_pb::catalog::{PbCreateType, PbTable};
3939
use risingwave_pb::meta::relation::PbRelationInfo;
4040
use risingwave_pb::meta::subscribe_response::{
41-
Info as NotificationInfo, Info, Operation as NotificationOperation, Operation,
41+
Info as NotificationInfo, Operation as NotificationOperation, Operation,
4242
};
4343
use risingwave_pb::meta::table_fragments::PbActorStatus;
4444
use risingwave_pb::meta::{
@@ -543,7 +543,7 @@ impl CatalogController {
543543

544544
let table = table::ActiveModel::from(table).update(&txn).await?;
545545

546-
let old_fragment_mappings = get_fragment_mappings(&txn, job_id).await?;
546+
// let old_fragment_mappings = get_fragment_mappings(&txn, job_id).await?;
547547
// 1. replace old fragments/actors with new ones.
548548
Fragment::delete_many()
549549
.filter(fragment::Column::JobId.eq(job_id))
@@ -701,8 +701,11 @@ impl CatalogController {
701701

702702
txn.commit().await?;
703703

704-
self.notify_fragment_mapping(NotificationOperation::Delete, old_fragment_mappings)
705-
.await;
704+
// FIXME: Do not notify frontend currently, because frontend nodes might refer to old table
705+
// catalog and need to access the old fragment. Let frontend nodes delete the old fragment
706+
// when they receive table catalog change.
707+
// self.notify_fragment_mapping(NotificationOperation::Delete, old_fragment_mappings)
708+
// .await;
706709
self.notify_fragment_mapping(NotificationOperation::Add, fragment_mapping)
707710
.await;
708711
let version = self
@@ -1200,13 +1203,8 @@ impl CatalogController {
12001203
}
12011204

12021205
txn.commit().await?;
1203-
1204-
for mapping in fragment_mapping_to_notify {
1205-
self.env
1206-
.notification_manager()
1207-
.notify_frontend(Operation::Update, Info::ParallelUnitMapping(mapping))
1208-
.await;
1209-
}
1206+
self.notify_fragment_mapping(Operation::Update, fragment_mapping_to_notify)
1207+
.await;
12101208

12111209
Ok(())
12121210
}

0 commit comments

Comments
 (0)