Skip to content

Commit 7302424

Browse files
authored
feat(meta): support group notification (risingwavelabs#8741)
1 parent be9723e commit 7302424

File tree

9 files changed

+380
-182
lines changed

9 files changed

+380
-182
lines changed

proto/meta.proto

Lines changed: 16 additions & 6 deletions
Original file line numberDiff line numberDiff line change
@@ -251,6 +251,21 @@ message MetaSnapshot {
251251
SnapshotVersion version = 13;
252252
}
253253

254+
message Relation {
255+
oneof relation_info {
256+
catalog.Table table = 1;
257+
catalog.Source source = 2;
258+
catalog.Sink sink = 3;
259+
catalog.Index index = 4;
260+
catalog.View view = 5;
261+
catalog.Function function = 6;
262+
}
263+
}
264+
265+
message RelationGroup {
266+
repeated Relation relations = 1;
267+
}
268+
254269
message SubscribeResponse {
255270
enum Operation {
256271
UNSPECIFIED = 0;
@@ -265,12 +280,6 @@ message SubscribeResponse {
265280
oneof info {
266281
catalog.Database database = 4;
267282
catalog.Schema schema = 5;
268-
catalog.Table table = 6;
269-
catalog.Source source = 7;
270-
catalog.Sink sink = 8;
271-
catalog.Index index = 9;
272-
catalog.View view = 10;
273-
catalog.Function function = 18;
274283
user.UserInfo user = 11;
275284
FragmentParallelUnitMapping parallel_unit_mapping = 12;
276285
common.WorkerNode node = 13;
@@ -280,6 +289,7 @@ message SubscribeResponse {
280289
backup_service.MetaBackupManifestId meta_backup_manifest_id = 17;
281290
SystemParams system_params = 19;
282291
hummock.WriteLimits hummock_write_limits = 20;
292+
RelationGroup relation_group = 21;
283293
}
284294
}
285295

src/common/common_service/src/observer_manager.rs

Lines changed: 1 addition & 9 deletions
Original file line numberDiff line numberDiff line change
@@ -116,15 +116,7 @@ where
116116
};
117117

118118
notification_vec.retain_mut(|notification| match notification.info.as_ref().unwrap() {
119-
Info::Database(_)
120-
| Info::Schema(_)
121-
| Info::Table(_)
122-
| Info::Source(_)
123-
| Info::Sink(_)
124-
| Info::Index(_)
125-
| Info::View(_)
126-
| Info::Function(_)
127-
| Info::User(_) => {
119+
Info::Database(_) | Info::Schema(_) | Info::RelationGroup(_) | Info::User(_) => {
128120
notification.version > info.version.as_ref().unwrap().catalog_version
129121
}
130122
Info::ParallelUnitMapping(_) => {

src/frontend/src/observer/observer_manager.rs

Lines changed: 71 additions & 53 deletions
Original file line numberDiff line numberDiff line change
@@ -20,6 +20,7 @@ use risingwave_common::hash::ParallelUnitMapping;
2020
use risingwave_common::system_param::local_manager::LocalSystemParamsManagerRef;
2121
use risingwave_common_service::observer_manager::{ObserverState, SubscribeFrontend};
2222
use risingwave_pb::common::WorkerNode;
23+
use risingwave_pb::meta::relation::RelationInfo;
2324
use risingwave_pb::meta::subscribe_response::{Info, Operation};
2425
use risingwave_pb::meta::{FragmentParallelUnitMapping, SubscribeResponse};
2526
use tokio::sync::watch::Sender;
@@ -49,14 +50,7 @@ impl ObserverState for FrontendObserverNode {
4950
};
5051

5152
match info.to_owned() {
52-
Info::Database(_)
53-
| Info::Schema(_)
54-
| Info::Table(_)
55-
| Info::Source(_)
56-
| Info::Index(_)
57-
| Info::Sink(_)
58-
| Info::Function(_)
59-
| Info::View(_) => {
53+
Info::Database(_) | Info::Schema(_) | Info::RelationGroup(_) => {
6054
self.handle_catalog_notification(resp);
6155
}
6256
Info::Node(node) => {
@@ -193,52 +187,76 @@ impl FrontendObserverNode {
193187
Operation::Delete => catalog_guard.drop_schema(schema.database_id, schema.id),
194188
_ => panic!("receive an unsupported notify {:?}", resp),
195189
},
196-
Info::Table(table) => match resp.operation() {
197-
Operation::Add => catalog_guard.create_table(table),
198-
Operation::Delete => {
199-
catalog_guard.drop_table(table.database_id, table.schema_id, table.id.into())
200-
}
201-
Operation::Update => catalog_guard.update_table(table),
202-
_ => panic!("receive an unsupported notify {:?}", resp),
203-
},
204-
Info::Source(source) => match resp.operation() {
205-
Operation::Add => catalog_guard.create_source(source),
206-
Operation::Delete => {
207-
catalog_guard.drop_source(source.database_id, source.schema_id, source.id)
208-
}
209-
_ => panic!("receive an unsupported notify {:?}", resp),
210-
},
211-
Info::Sink(sink) => match resp.operation() {
212-
Operation::Add => catalog_guard.create_sink(sink),
213-
Operation::Delete => {
214-
catalog_guard.drop_sink(sink.database_id, sink.schema_id, sink.id)
215-
}
216-
_ => panic!("receive an unsupported notify {:?}", resp),
217-
},
218-
Info::Index(index) => match resp.operation() {
219-
Operation::Add => catalog_guard.create_index(index),
220-
Operation::Delete => {
221-
catalog_guard.drop_index(index.database_id, index.schema_id, index.id.into())
222-
}
223-
Operation::Update => catalog_guard.update_index(index),
224-
_ => panic!("receive an unsupported notify {:?}", resp),
225-
},
226-
Info::View(view) => match resp.operation() {
227-
Operation::Add => catalog_guard.create_view(view),
228-
Operation::Delete => {
229-
catalog_guard.drop_view(view.database_id, view.schema_id, view.id)
190+
Info::RelationGroup(relation_group) => {
191+
for relation in &relation_group.relations {
192+
let Some(relation) = relation.relation_info.as_ref() else {
193+
continue;
194+
};
195+
match relation {
196+
RelationInfo::Table(table) => match resp.operation() {
197+
Operation::Add => catalog_guard.create_table(table),
198+
Operation::Delete => catalog_guard.drop_table(
199+
table.database_id,
200+
table.schema_id,
201+
table.id.into(),
202+
),
203+
Operation::Update => {
204+
let old_table =
205+
catalog_guard.get_table_by_id(&table.id.into()).unwrap();
206+
catalog_guard.update_table(table);
207+
assert!(old_table.fragment_id != table.fragment_id);
208+
// FIXME: the frontend node delete its fragment for the update
209+
// operation by itself.
210+
self.worker_node_manager
211+
.remove_fragment_mapping(&old_table.fragment_id);
212+
}
213+
_ => panic!("receive an unsupported notify {:?}", resp),
214+
},
215+
RelationInfo::Source(source) => match resp.operation() {
216+
Operation::Add => catalog_guard.create_source(source),
217+
Operation::Delete => catalog_guard.drop_source(
218+
source.database_id,
219+
source.schema_id,
220+
source.id,
221+
),
222+
_ => panic!("receive an unsupported notify {:?}", resp),
223+
},
224+
RelationInfo::Sink(sink) => match resp.operation() {
225+
Operation::Add => catalog_guard.create_sink(sink),
226+
Operation::Delete => {
227+
catalog_guard.drop_sink(sink.database_id, sink.schema_id, sink.id)
228+
}
229+
_ => panic!("receive an unsupported notify {:?}", resp),
230+
},
231+
RelationInfo::Index(index) => match resp.operation() {
232+
Operation::Add => catalog_guard.create_index(index),
233+
Operation::Delete => catalog_guard.drop_index(
234+
index.database_id,
235+
index.schema_id,
236+
index.id.into(),
237+
),
238+
Operation::Update => catalog_guard.update_index(index),
239+
_ => panic!("receive an unsupported notify {:?}", resp),
240+
},
241+
RelationInfo::View(view) => match resp.operation() {
242+
Operation::Add => catalog_guard.create_view(view),
243+
Operation::Delete => {
244+
catalog_guard.drop_view(view.database_id, view.schema_id, view.id)
245+
}
246+
_ => panic!("receive an unsupported notify {:?}", resp),
247+
},
248+
RelationInfo::Function(function) => match resp.operation() {
249+
Operation::Add => catalog_guard.create_function(function),
250+
Operation::Delete => catalog_guard.drop_function(
251+
function.database_id,
252+
function.schema_id,
253+
function.id.into(),
254+
),
255+
_ => panic!("receive an unsupported notify {:?}", resp),
256+
},
257+
}
230258
}
231-
_ => panic!("receive an unsupported notify {:?}", resp),
232-
},
233-
Info::Function(function) => match resp.operation() {
234-
Operation::Add => catalog_guard.create_function(function),
235-
Operation::Delete => catalog_guard.drop_function(
236-
function.database_id,
237-
function.schema_id,
238-
function.id.into(),
239-
),
240-
_ => panic!("receive an unsupported notify {:?}", resp),
241-
},
259+
}
242260
_ => unreachable!(),
243261
}
244262
assert!(

src/meta/src/hummock/manager/mod.rs

Lines changed: 3 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -166,6 +166,7 @@ use risingwave_hummock_sdk::table_stats::{
166166
use risingwave_pb::catalog::Table;
167167
use risingwave_pb::hummock::version_update_payload::Payload;
168168
use risingwave_pb::hummock::PbCompactionGroupInfo;
169+
use risingwave_pb::meta::relation::RelationInfo;
169170

170171
/// Acquire write lock of the lock with `lock_name`.
171172
/// The macro will use macro `function_name` to get the name of the function of method that calls
@@ -1732,11 +1733,11 @@ where
17321733
for table in table_catalogs {
17331734
self.env
17341735
.notification_manager()
1735-
.notify_hummock(Operation::Add, Info::Table(table.clone()))
1736+
.notify_hummock_relation_info(Operation::Add, RelationInfo::Table(table.clone()))
17361737
.await;
17371738
self.env
17381739
.notification_manager()
1739-
.notify_compactor(Operation::Add, Info::Table(table))
1740+
.notify_compactor_relation_info(Operation::Add, RelationInfo::Table(table))
17401741
.await;
17411742
}
17421743

src/meta/src/manager/catalog/fragment.rs

Lines changed: 5 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -279,6 +279,7 @@ where
279279
// FIXME: we use a dummy table ID for new table fragments, so we can drop the old fragments
280280
// with the real table ID, then replace the dummy table ID with the real table ID. This is a
281281
// workaround for not having the version info in the fragment manager.
282+
#[allow(unused_variables)]
282283
let old_table_fragment = table_fragments
283284
.remove(table_id)
284285
.with_context(|| format!("table_fragment not exist: id={}", table_id))?;
@@ -344,8 +345,10 @@ where
344345
// Commit changes and notify about the changes.
345346
commit_meta!(self, table_fragments)?;
346347

347-
self.notify_fragment_mapping(&old_table_fragment, Operation::Delete)
348-
.await;
348+
// FIXME: Do not notify frontend currently, because frontend nodes might refer to old table
349+
// catalog and need to access the old fragment. Let frontend nodes delete the old fragment
350+
// when they receive table catalog change. self.notify_fragment_mapping(&
351+
// old_table_fragment, Operation::Delete) .await;
349352
self.notify_fragment_mapping(&table_fragment, Operation::Add)
350353
.await;
351354

0 commit comments

Comments
 (0)