Skip to content

Commit d557a6c

Browse files
authored
fix(meta): fix alter table add/drop column with indexes (risingwavelabs#8664)
1 parent 59a0947 commit d557a6c

File tree

6 files changed

+128
-5
lines changed

6 files changed

+128
-5
lines changed

e2e_test/ddl/alter_table_column.slt

Lines changed: 32 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -209,3 +209,35 @@ drop materialized view mv;
209209

210210
statement ok
211211
drop table t;
212+
213+
# Test the consistency of tables and indexes #https://github.com/risingwavelabs/risingwave/issues/8649
214+
statement ok
215+
create table t(id int primary key, a int, b varchar);
216+
217+
statement ok
218+
create index idx on t(a);
219+
220+
statement ok
221+
alter table t add column c int;
222+
223+
query IITI rowsort
224+
select * from t;
225+
----
226+
227+
statement ok
228+
drop table t;
229+
230+
statement ok
231+
create table t(id int primary key, a int, b varchar);
232+
233+
statement ok
234+
create index idx on t(b) include(b);
235+
236+
statement ok
237+
alter table t drop column a;
238+
239+
query II rowsort
240+
select * from t where b = '1';
241+
242+
statement ok
243+
drop table t;

src/frontend/src/catalog/root_catalog.rs

Lines changed: 8 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -222,6 +222,14 @@ impl Catalog {
222222
.update_table(proto);
223223
}
224224

225+
pub fn update_index(&mut self, proto: &PbIndex) {
226+
self.get_database_mut(proto.database_id)
227+
.unwrap()
228+
.get_schema_mut(proto.schema_id)
229+
.unwrap()
230+
.update_index(proto);
231+
}
232+
225233
pub fn drop_source(&mut self, db_id: DatabaseId, schema_id: SchemaId, source_id: SourceId) {
226234
self.get_database_mut(db_id)
227235
.unwrap()

src/frontend/src/catalog/schema_catalog.rs

Lines changed: 28 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -85,6 +85,34 @@ impl SchemaCatalog {
8585
self.table_by_id.insert(id, table_ref);
8686
}
8787

88+
pub fn update_index(&mut self, prost: &PbIndex) {
89+
let name = prost.name.clone();
90+
let id = prost.id.into();
91+
let index_table = self.get_table_by_id(&prost.index_table_id.into()).unwrap();
92+
let primary_table = self
93+
.get_table_by_id(&prost.primary_table_id.into())
94+
.unwrap();
95+
let index: IndexCatalog = IndexCatalog::build_from(prost, index_table, primary_table);
96+
let index_ref = Arc::new(index);
97+
98+
self.index_by_name.insert(name, index_ref.clone());
99+
self.index_by_id.insert(id, index_ref.clone());
100+
101+
match self.indexes_by_table_id.entry(index_ref.primary_table.id) {
102+
Occupied(mut entry) => {
103+
let pos = entry
104+
.get()
105+
.iter()
106+
.position(|x| x.id == index_ref.id)
107+
.unwrap();
108+
*entry.get_mut().get_mut(pos).unwrap() = index_ref;
109+
}
110+
Vacant(_entry) => {
111+
unreachable!()
112+
}
113+
};
114+
}
115+
88116
pub fn drop_table(&mut self, id: TableId) {
89117
let table_ref = self.table_by_id.remove(&id).unwrap();
90118
self.table_by_name.remove(&table_ref.name).unwrap();

src/frontend/src/observer/observer_manager.rs

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -220,6 +220,7 @@ impl FrontendObserverNode {
220220
Operation::Delete => {
221221
catalog_guard.drop_index(index.database_id, index.schema_id, index.id.into())
222222
}
223+
Operation::Update => catalog_guard.update_index(index),
223224
_ => panic!("receive an unsupported notify {:?}", resp),
224225
},
225226
Info::View(view) => match resp.operation() {

src/meta/src/manager/catalog/mod.rs

Lines changed: 47 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -88,6 +88,8 @@ macro_rules! commit_meta {
8888
};
8989
}
9090
pub(crate) use commit_meta;
91+
use risingwave_common::util::column_index_mapping::ColIndexMapping;
92+
use risingwave_pb::expr::expr_node::RexNode;
9193
use risingwave_pb::meta::CreatingJobInfo;
9294

9395
pub type CatalogManagerRef<S> = Arc<CatalogManager<S>>;
@@ -1529,27 +1531,70 @@ where
15291531
pub async fn finish_replace_table_procedure(
15301532
&self,
15311533
table: &Table,
1534+
table_col_index_mapping: ColIndexMapping,
15321535
) -> MetaResult<NotificationVersion> {
15331536
let core = &mut *self.core.lock().await;
15341537
let database_core = &mut core.database;
15351538
let mut tables = BTreeMapTransaction::new(&mut database_core.tables);
1539+
let mut indexes = BTreeMapTransaction::new(&mut database_core.indexes);
15361540
let key = (table.database_id, table.schema_id, table.name.clone());
15371541
assert!(
15381542
tables.contains_key(&table.id)
15391543
&& database_core.in_progress_creation_tracker.contains(&key),
15401544
"table must exist and be in altering procedure"
15411545
);
15421546

1547+
let index_ids: Vec<_> = indexes
1548+
.tree_ref()
1549+
.iter()
1550+
.filter(|(_, index)| index.primary_table_id == table.id)
1551+
.map(|(index_id, _index)| *index_id)
1552+
.collect_vec();
1553+
1554+
let mut updated_indexes = vec![];
1555+
1556+
for index_id in &index_ids {
1557+
let mut index = indexes.get_mut(*index_id).unwrap();
1558+
index
1559+
.index_item
1560+
.iter_mut()
1561+
.for_each(|x| match x.rex_node.as_mut().unwrap() {
1562+
RexNode::InputRef(input_col_idx) => {
1563+
*input_col_idx =
1564+
table_col_index_mapping.map(*input_col_idx as usize) as u32;
1565+
assert_eq!(
1566+
x.return_type,
1567+
table.columns[*input_col_idx as usize]
1568+
.column_desc
1569+
.clone()
1570+
.unwrap()
1571+
.column_type
1572+
);
1573+
}
1574+
RexNode::FuncCall(_) => unimplemented!(),
1575+
_ => unreachable!(),
1576+
});
1577+
1578+
updated_indexes.push(indexes.get(index_id).cloned().unwrap());
1579+
}
1580+
15431581
// TODO: Here we reuse the `creation` tracker for `alter` procedure, as an `alter` must
15441582
database_core.in_progress_creation_tracker.remove(&key);
15451583

15461584
tables.insert(table.id, table.clone());
1547-
commit_meta!(self, tables)?;
1585+
commit_meta!(self, tables, indexes)?;
15481586

1549-
let version = self
1587+
// TODO: support group notification.
1588+
let mut version = self
15501589
.notify_frontend(Operation::Update, Info::Table(table.to_owned()))
15511590
.await;
15521591

1592+
for index in updated_indexes {
1593+
version = self
1594+
.notify_frontend(Operation::Update, Info::Index(index))
1595+
.await;
1596+
}
1597+
15531598
Ok(version)
15541599
}
15551600

src/meta/src/rpc/ddl_controller.rs

Lines changed: 12 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -550,7 +550,12 @@ where
550550

551551
let result = try {
552552
let (ctx, table_fragments) = self
553-
.build_replace_table(env, &stream_job, fragment_graph, table_col_index_mapping)
553+
.build_replace_table(
554+
env,
555+
&stream_job,
556+
fragment_graph,
557+
table_col_index_mapping.clone(),
558+
)
554559
.await?;
555560

556561
self.stream_manager
@@ -559,7 +564,10 @@ where
559564
};
560565

561566
match result {
562-
Ok(_) => self.finish_replace_table(&stream_job).await,
567+
Ok(_) => {
568+
self.finish_replace_table(&stream_job, table_col_index_mapping)
569+
.await
570+
}
563571
Err(err) => {
564572
self.cancel_replace_table(&stream_job).await?;
565573
Err(err)
@@ -687,13 +695,14 @@ where
687695
async fn finish_replace_table(
688696
&self,
689697
stream_job: &StreamingJob,
698+
table_col_index_mapping: ColIndexMapping,
690699
) -> MetaResult<NotificationVersion> {
691700
let StreamingJob::Table(None, table) = stream_job else {
692701
unreachable!("unexpected job: {stream_job:?}")
693702
};
694703

695704
self.catalog_manager
696-
.finish_replace_table_procedure(table)
705+
.finish_replace_table_procedure(table, table_col_index_mapping)
697706
.await
698707
}
699708

0 commit comments

Comments
 (0)