Skip to content

Commit 4fc249b

Browse files
committed
feat(ctl): list serving fragment mappings
1 parent 8eb0e43 commit 4fc249b

File tree

11 files changed

+162
-7
lines changed

11 files changed

+162
-7
lines changed

proto/meta.proto

Lines changed: 10 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -411,3 +411,13 @@ service SystemParamsService {
411411
rpc GetSystemParams(GetSystemParamsRequest) returns (GetSystemParamsResponse);
412412
rpc SetSystemParam(SetSystemParamRequest) returns (SetSystemParamResponse);
413413
}
414+
415+
message GetServingVnodeMappingsRequest {}
416+
417+
message GetServingVnodeMappingsResponse {
418+
repeated FragmentParallelUnitMapping mappings = 1;
419+
}
420+
421+
service ServingService {
422+
rpc GetServingVnodeMappings(GetServingVnodeMappingsRequest) returns (GetServingVnodeMappingsResponse);
423+
}

src/ctl/src/cmd_impl/meta.rs

Lines changed: 2 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -17,9 +17,11 @@ mod cluster_info;
1717
mod connection;
1818
mod pause_resume;
1919
mod reschedule;
20+
mod serving;
2021

2122
pub use backup_meta::*;
2223
pub use cluster_info::*;
2324
pub use connection::*;
2425
pub use pause_resume::*;
2526
pub use reschedule::*;
27+
pub use serving::*;

src/ctl/src/cmd_impl/meta/serving.rs

Lines changed: 58 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,58 @@
1+
// Copyright 2023 RisingWave Labs
2+
//
3+
// Licensed under the Apache License, Version 2.0 (the "License");
4+
// you may not use this file except in compliance with the License.
5+
// You may obtain a copy of the License at
6+
//
7+
// http://www.apache.org/licenses/LICENSE-2.0
8+
//
9+
// Unless required by applicable law or agreed to in writing, software
10+
// distributed under the License is distributed on an "AS IS" BASIS,
11+
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
12+
// See the License for the specific language governing permissions and
13+
// limitations under the License.
14+
15+
use comfy_table::{Cell, Row, Table};
16+
use itertools::Itertools;
17+
18+
use crate::CtlContext;
19+
20+
pub async fn list_serving_fragment_mappings(context: &CtlContext) -> anyhow::Result<()> {
21+
let meta_client = context.meta_client().await?;
22+
let mappings = meta_client.list_serving_vnode_mappings().await?;
23+
24+
let mut table = Table::new();
25+
// Parallel Unit, Frag 1, Frag 2, ..., Frag N
26+
table.set_header({
27+
let mut row = Row::new();
28+
row.add_cell("Parallel Unit".into());
29+
for &fid in mappings.keys() {
30+
let cell = Cell::new(format!("Frag {fid}"));
31+
row.add_cell(cell);
32+
}
33+
row
34+
});
35+
36+
let all_parallel_units = mappings
37+
.values()
38+
.flat_map(|m| m.iter_unique())
39+
.unique()
40+
.sorted()
41+
.collect_vec();
42+
for pu in &all_parallel_units {
43+
let mut row = Row::new();
44+
row.add_cell((*pu).into());
45+
for mapping in mappings.values() {
46+
let mut weight = 0;
47+
for pu_id in mapping.iter() {
48+
if pu_id == *pu {
49+
weight += 1;
50+
}
51+
}
52+
row.add_cell(weight.into());
53+
}
54+
table.add_row(row);
55+
}
56+
println!("{table}");
57+
Ok(())
58+
}

src/ctl/src/lib.rs

Lines changed: 6 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -229,6 +229,9 @@ enum MetaCommands {
229229

230230
/// List all existing connections in the catalog
231231
ListConnections,
232+
233+
/// List fragment to parallel units mapping for serving
234+
ListServingFragmentMapping,
232235
}
233236

234237
pub async fn start(opts: CliOpts) -> Result<()> {
@@ -363,6 +366,9 @@ pub async fn start_impl(opts: CliOpts, context: &CtlContext) -> Result<()> {
363366
Commands::Meta(MetaCommands::ListConnections) => {
364367
cmd_impl::meta::list_connections(context).await?
365368
}
369+
Commands::Meta(MetaCommands::ListServingFragmentMapping) => {
370+
cmd_impl::meta::list_serving_fragment_mappings(context).await?
371+
}
366372
Commands::Trace => cmd_impl::trace::trace(context).await?,
367373
Commands::Profile { sleep } => cmd_impl::profile::profile(context, sleep).await?,
368374
}

src/meta/src/lib.rs

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -37,14 +37,14 @@
3737

3838
pub mod backup_restore;
3939
mod barrier;
40-
pub(crate) mod batch;
4140
#[cfg(not(madsim))] // no need in simulation test
4241
mod dashboard;
4342
mod error;
4443
pub mod hummock;
4544
pub mod manager;
4645
mod model;
4746
mod rpc;
47+
pub(crate) mod serving;
4848
pub mod storage;
4949
mod stream;
5050
pub(crate) mod telemetry;

src/meta/src/rpc/server.rs

Lines changed: 8 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -36,6 +36,7 @@ use risingwave_pb::meta::heartbeat_service_server::HeartbeatServiceServer;
3636
use risingwave_pb::meta::meta_member_service_server::MetaMemberServiceServer;
3737
use risingwave_pb::meta::notification_service_server::NotificationServiceServer;
3838
use risingwave_pb::meta::scale_service_server::ScaleServiceServer;
39+
use risingwave_pb::meta::serving_service_server::ServingServiceServer;
3940
use risingwave_pb::meta::stream_manager_service_server::StreamManagerServiceServer;
4041
use risingwave_pb::meta::system_params_service_server::SystemParamsServiceServer;
4142
use risingwave_pb::meta::telemetry_info_service_server::TelemetryInfoServiceServer;
@@ -51,10 +52,10 @@ use super::intercept::MetricsMiddlewareLayer;
5152
use super::service::health_service::HealthServiceImpl;
5253
use super::service::notification_service::NotificationServiceImpl;
5354
use super::service::scale_service::ScaleServiceImpl;
55+
use super::service::serving_service::ServingServiceImpl;
5456
use super::DdlServiceImpl;
5557
use crate::backup_restore::BackupManager;
5658
use crate::barrier::{BarrierScheduler, GlobalBarrierManager};
57-
use crate::batch::ServingVnodeMapping;
5859
use crate::hummock::{CompactionScheduler, HummockManager};
5960
use crate::manager::{
6061
CatalogManager, ClusterManager, FragmentManager, IdleManager, MetaOpts, MetaSrvEnv,
@@ -72,10 +73,11 @@ use crate::rpc::service::stream_service::StreamServiceImpl;
7273
use crate::rpc::service::system_params_service::SystemParamsServiceImpl;
7374
use crate::rpc::service::telemetry_service::TelemetryInfoServiceImpl;
7475
use crate::rpc::service::user_service::UserServiceImpl;
76+
use crate::serving::ServingVnodeMapping;
7577
use crate::storage::{EtcdMetaStore, MemStore, MetaStore, WrappedEtcdClient as EtcdClient};
7678
use crate::stream::{GlobalStreamManager, SourceManager};
7779
use crate::telemetry::{MetaReportCreator, MetaTelemetryInfoFetcher};
78-
use crate::{batch, hummock, MetaError, MetaResult};
80+
use crate::{hummock, serving, MetaError, MetaResult};
7981

8082
#[derive(Debug)]
8183
pub enum MetaStoreBackend {
@@ -358,7 +360,7 @@ pub async fn start_service_as_election_leader<S: MetaStore>(
358360
.unwrap(),
359361
);
360362
let serving_vnode_mapping = Arc::new(ServingVnodeMapping::default());
361-
batch::on_meta_start(
363+
serving::on_meta_start(
362364
env.notification_manager_ref(),
363365
cluster_manager.clone(),
364366
fragment_manager.clone(),
@@ -538,6 +540,7 @@ pub async fn start_service_as_election_leader<S: MetaStore>(
538540
let backup_srv = BackupServiceImpl::new(backup_manager);
539541
let telemetry_srv = TelemetryInfoServiceImpl::new(meta_store.clone());
540542
let system_params_srv = SystemParamsServiceImpl::new(system_params_manager.clone());
543+
let serving_srv = ServingServiceImpl::new(serving_vnode_mapping.clone());
541544

542545
if let Some(prometheus_addr) = address_info.prometheus_addr {
543546
MetricsManager::boot_metrics_service(
@@ -580,7 +583,7 @@ pub async fn start_service_as_election_leader<S: MetaStore>(
580583
sub_tasks.push(HummockManager::start_compaction_heartbeat(hummock_manager.clone()).await);
581584
sub_tasks.push(HummockManager::start_lsm_stat_report(hummock_manager).await);
582585
sub_tasks.push(
583-
batch::start_serving_vnode_mapping_worker(
586+
serving::start_serving_vnode_mapping_worker(
584587
env.notification_manager_ref(),
585588
cluster_manager.clone(),
586589
fragment_manager.clone(),
@@ -687,6 +690,7 @@ pub async fn start_service_as_election_leader<S: MetaStore>(
687690
.add_service(BackupServiceServer::new(backup_srv))
688691
.add_service(SystemParamsServiceServer::new(system_params_srv))
689692
.add_service(TelemetryInfoServiceServer::new(telemetry_srv))
693+
.add_service(ServingServiceServer::new(serving_srv))
690694
.serve_with_shutdown(address_info.listen_addr, async move {
691695
tokio::select! {
692696
res = svc_shutdown_rx.changed() => {

src/meta/src/rpc/service/mod.rs

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -21,6 +21,7 @@ pub mod hummock_service;
2121
pub mod meta_member_service;
2222
pub mod notification_service;
2323
pub mod scale_service;
24+
pub mod serving_service;
2425
pub mod stream_service;
2526
pub mod system_params_service;
2627
pub mod telemetry_service;

src/meta/src/rpc/service/notification_service.rs

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -29,12 +29,12 @@ use tokio_stream::wrappers::UnboundedReceiverStream;
2929
use tonic::{Request, Response, Status};
3030

3131
use crate::backup_restore::BackupManagerRef;
32-
use crate::batch::ServingVnodeMappingRef;
3332
use crate::hummock::HummockManagerRef;
3433
use crate::manager::{
3534
Catalog, CatalogManagerRef, ClusterManagerRef, FragmentManagerRef, MetaSrvEnv, Notification,
3635
NotificationVersion, WorkerKey,
3736
};
37+
use crate::serving::ServingVnodeMappingRef;
3838
use crate::storage::MetaStore;
3939

4040
pub struct NotificationServiceImpl<S: MetaStore> {
Lines changed: 52 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,52 @@
1+
// Copyright 2023 RisingWave Labs
2+
//
3+
// Licensed under the Apache License, Version 2.0 (the "License");
4+
// you may not use this file except in compliance with the License.
5+
// You may obtain a copy of the License at
6+
//
7+
// http://www.apache.org/licenses/LICENSE-2.0
8+
//
9+
// Unless required by applicable law or agreed to in writing, software
10+
// distributed under the License is distributed on an "AS IS" BASIS,
11+
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
12+
// See the License for the specific language governing permissions and
13+
// limitations under the License.
14+
15+
use risingwave_pb::meta::serving_service_server::ServingService;
16+
use risingwave_pb::meta::{
17+
FragmentParallelUnitMapping, GetServingVnodeMappingsRequest, GetServingVnodeMappingsResponse,
18+
};
19+
use tonic::{Request, Response, Status};
20+
21+
use crate::serving::ServingVnodeMappingRef;
22+
23+
pub struct ServingServiceImpl {
24+
serving_vnode_mapping: ServingVnodeMappingRef,
25+
}
26+
27+
impl ServingServiceImpl {
28+
pub fn new(serving_vnode_mapping: ServingVnodeMappingRef) -> Self {
29+
Self {
30+
serving_vnode_mapping,
31+
}
32+
}
33+
}
34+
35+
#[async_trait::async_trait]
36+
impl ServingService for ServingServiceImpl {
37+
async fn get_serving_vnode_mappings(
38+
&self,
39+
_request: Request<GetServingVnodeMappingsRequest>,
40+
) -> Result<Response<GetServingVnodeMappingsResponse>, Status> {
41+
let mappings = self
42+
.serving_vnode_mapping
43+
.all()
44+
.into_iter()
45+
.map(|(fragment_id, mapping)| FragmentParallelUnitMapping {
46+
fragment_id,
47+
mapping: Some(mapping.to_protobuf()),
48+
})
49+
.collect();
50+
Ok(Response::new(GetServingVnodeMappingsResponse { mappings }))
51+
}
52+
}
File renamed without changes.

src/rpc_client/src/meta_client.rs

Lines changed: 23 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -27,6 +27,7 @@ use itertools::Itertools;
2727
use lru::LruCache;
2828
use risingwave_common::catalog::{CatalogVersion, FunctionId, IndexId, TableId};
2929
use risingwave_common::config::{MetaConfig, MAX_CONNECTION_WINDOW_SIZE};
30+
use risingwave_common::hash::ParallelUnitMapping;
3031
use risingwave_common::system_param::reader::SystemParamsReader;
3132
use risingwave_common::telemetry::report::TelemetryInfoFetcher;
3233
use risingwave_common::util::addr::HostAddr;
@@ -59,6 +60,7 @@ use risingwave_pb::meta::meta_member_service_client::MetaMemberServiceClient;
5960
use risingwave_pb::meta::notification_service_client::NotificationServiceClient;
6061
use risingwave_pb::meta::reschedule_request::PbReschedule;
6162
use risingwave_pb::meta::scale_service_client::ScaleServiceClient;
63+
use risingwave_pb::meta::serving_service_client::ServingServiceClient;
6264
use risingwave_pb::meta::stream_manager_service_client::StreamManagerServiceClient;
6365
use risingwave_pb::meta::system_params_service_client::SystemParamsServiceClient;
6466
use risingwave_pb::meta::telemetry_info_service_client::TelemetryInfoServiceClient;
@@ -896,6 +898,22 @@ impl MetaClient {
896898
let resp = self.inner.get_tables(req).await?;
897899
Ok(resp.tables)
898900
}
901+
902+
pub async fn list_serving_vnode_mappings(&self) -> Result<HashMap<u32, ParallelUnitMapping>> {
903+
let req = GetServingVnodeMappingsRequest {};
904+
let resp = self.inner.get_serving_vnode_mappings(req).await?;
905+
let mappings = resp
906+
.mappings
907+
.into_iter()
908+
.map(|p| {
909+
(
910+
p.fragment_id,
911+
ParallelUnitMapping::from_protobuf(p.mapping.as_ref().unwrap()),
912+
)
913+
})
914+
.collect();
915+
Ok(mappings)
916+
}
899917
}
900918

901919
#[async_trait]
@@ -1081,6 +1099,7 @@ struct GrpcMetaClientCore {
10811099
backup_client: BackupServiceClient<Channel>,
10821100
telemetry_client: TelemetryInfoServiceClient<Channel>,
10831101
system_params_client: SystemParamsServiceClient<Channel>,
1102+
serving_client: ServingServiceClient<Channel>,
10841103
}
10851104

10861105
impl GrpcMetaClientCore {
@@ -1096,7 +1115,8 @@ impl GrpcMetaClientCore {
10961115
let scale_client = ScaleServiceClient::new(channel.clone());
10971116
let backup_client = BackupServiceClient::new(channel.clone());
10981117
let telemetry_client = TelemetryInfoServiceClient::new(channel.clone());
1099-
let system_params_client = SystemParamsServiceClient::new(channel);
1118+
let system_params_client = SystemParamsServiceClient::new(channel.clone());
1119+
let serving_client = ServingServiceClient::new(channel);
11001120

11011121
GrpcMetaClientCore {
11021122
cluster_client,
@@ -1111,6 +1131,7 @@ impl GrpcMetaClientCore {
11111131
backup_client,
11121132
telemetry_client,
11131133
system_params_client,
1134+
serving_client,
11141135
}
11151136
}
11161137
}
@@ -1528,6 +1549,7 @@ macro_rules! for_all_meta_rpc {
15281549
,{ telemetry_client, get_telemetry_info, GetTelemetryInfoRequest, TelemetryInfoResponse}
15291550
,{ system_params_client, get_system_params, GetSystemParamsRequest, GetSystemParamsResponse }
15301551
,{ system_params_client, set_system_param, SetSystemParamRequest, SetSystemParamResponse }
1552+
,{ serving_client, get_serving_vnode_mappings, GetServingVnodeMappingsRequest, GetServingVnodeMappingsResponse }
15311553
}
15321554
};
15331555
}

0 commit comments

Comments
 (0)