Skip to content

Commit 90c3df9

Browse files
zwang28lmatz
authored andcommitted
feat(trace): enable await tree trace for compactor (#10381)
1 parent 6ce0abd commit 90c3df9

File tree

17 files changed

+206
-17
lines changed

17 files changed

+206
-17
lines changed

Cargo.lock

Lines changed: 2 additions & 0 deletions
Some generated files are not rendered by default. Learn more about customizing how changed files appear on GitHub.

proto/monitor_service.proto

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -10,6 +10,7 @@ message StackTraceRequest {}
1010
message StackTraceResponse {
1111
map<uint32, string> actor_traces = 1;
1212
map<string, string> rpc_traces = 2;
13+
map<string, string> compaction_task_traces = 3;
1314
}
1415

1516
message ProfilingRequest {

src/compute/src/rpc/service/monitor_service.rs

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -70,6 +70,7 @@ impl MonitorService for MonitorServiceImpl {
7070
Ok(Response::new(StackTraceResponse {
7171
actor_traces,
7272
rpc_traces,
73+
compaction_task_traces: Default::default(),
7374
}))
7475
}
7576

src/compute/src/server.rs

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -234,6 +234,7 @@ pub async fn compute_node_serve(
234234
output_memory_limiter,
235235
sstable_object_id_manager: storage.sstable_object_id_manager().clone(),
236236
task_progress_manager: Default::default(),
237+
await_tree_reg: None,
237238
});
238239

239240
let (handle, shutdown_sender) =

src/ctl/src/cmd_impl/trace.rs

Lines changed: 20 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -17,7 +17,7 @@ use std::collections::BTreeMap;
1717
use risingwave_common::util::addr::HostAddr;
1818
use risingwave_pb::common::WorkerType;
1919
use risingwave_pb::monitor_service::StackTraceResponse;
20-
use risingwave_rpc_client::ComputeClientPool;
20+
use risingwave_rpc_client::{CompactorClient, ComputeClientPool};
2121

2222
use crate::CtlContext;
2323

@@ -41,6 +41,7 @@ pub async fn trace(context: &CtlContext) -> anyhow::Result<()> {
4141
let StackTraceResponse {
4242
actor_traces,
4343
rpc_traces,
44+
..
4445
} = client.stack_trace().await?;
4546

4647
all_actor_traces.extend(actor_traces);
@@ -65,5 +66,23 @@ pub async fn trace(context: &CtlContext) -> anyhow::Result<()> {
6566
}
6667
}
6768

69+
let compactor_nodes = meta_client.list_worker_nodes(WorkerType::Compactor).await?;
70+
let mut all_compaction_task_traces = BTreeMap::new();
71+
for compactor in compactor_nodes {
72+
let addr: HostAddr = compactor.get_host().unwrap().into();
73+
let client = CompactorClient::new(addr).await?;
74+
let StackTraceResponse {
75+
compaction_task_traces,
76+
..
77+
} = client.stack_trace().await?;
78+
all_compaction_task_traces.extend(compaction_task_traces);
79+
}
80+
if !all_compaction_task_traces.is_empty() {
81+
println!("--- Compactor Traces ---");
82+
for (key, trace) in all_compaction_task_traces {
83+
println!(">> Compaction Task {key}\n{trace}");
84+
}
85+
}
86+
6887
Ok(())
6988
}
Lines changed: 48 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,48 @@
1+
// Copyright 2023 RisingWave Labs
2+
//
3+
// Licensed under the Apache License, Version 2.0 (the "License");
4+
// you may not use this file except in compliance with the License.
5+
// You may obtain a copy of the License at
6+
//
7+
// http://www.apache.org/licenses/LICENSE-2.0
8+
//
9+
// Unless required by applicable law or agreed to in writing, software
10+
// distributed under the License is distributed on an "AS IS" BASIS,
11+
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
12+
// See the License for the specific language governing permissions and
13+
// limitations under the License.
14+
15+
use std::time::Duration;
16+
17+
use risingwave_common::util::addr::HostAddr;
18+
use risingwave_pb::monitor_service::monitor_service_client::MonitorServiceClient;
19+
use risingwave_pb::monitor_service::{StackTraceRequest, StackTraceResponse};
20+
use tonic::transport::{Channel, Endpoint};
21+
22+
use crate::error::Result;
23+
24+
#[derive(Clone)]
25+
pub struct CompactorClient {
26+
pub monitor_client: MonitorServiceClient<Channel>,
27+
}
28+
29+
impl CompactorClient {
30+
pub async fn new(host_addr: HostAddr) -> Result<Self> {
31+
let channel = Endpoint::from_shared(format!("http://{}", &host_addr))?
32+
.connect_timeout(Duration::from_secs(5))
33+
.connect()
34+
.await?;
35+
Ok(Self {
36+
monitor_client: MonitorServiceClient::new(channel),
37+
})
38+
}
39+
40+
pub async fn stack_trace(&self) -> Result<StackTraceResponse> {
41+
Ok(self
42+
.monitor_client
43+
.to_owned()
44+
.stack_trace(StackTraceRequest::default())
45+
.await?
46+
.into_inner())
47+
}
48+
}

src/rpc_client/src/lib.rs

Lines changed: 2 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -50,8 +50,10 @@ mod connector_client;
5050
mod hummock_meta_client;
5151
mod meta_client;
5252
// mod sink_client;
53+
mod compactor_client;
5354
mod stream_client;
5455

56+
pub use compactor_client::CompactorClient;
5557
pub use compute_client::{ComputeClient, ComputeClientPool, ComputeClientPoolRef};
5658
pub use connector_client::ConnectorClient;
5759
pub use hummock_meta_client::{CompactTaskItem, HummockMetaClient};

src/storage/compactor/Cargo.toml

Lines changed: 2 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -17,7 +17,9 @@ normal = ["workspace-hack"]
1717
[dependencies]
1818
anyhow = "1"
1919
async-trait = "0.1"
20+
await-tree = { workspace = true }
2021
clap = { version = "4", features = ["derive"] }
22+
parking_lot = "0.12"
2123
prometheus = { version = "0.13" }
2224
risingwave_common = { path = "../../common" }
2325
risingwave_common_service = { path = "../../common/common_service" }

src/storage/compactor/src/lib.rs

Lines changed: 6 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -18,7 +18,7 @@ mod server;
1818
mod telemetry;
1919

2020
use clap::Parser;
21-
use risingwave_common::config::OverrideConfig;
21+
use risingwave_common::config::{AsyncStackTraceOption, OverrideConfig};
2222

2323
use crate::server::compactor_serve;
2424

@@ -83,6 +83,11 @@ struct OverrideConfigOpts {
8383
#[clap(long, env = "RW_MAX_CONCURRENT_TASK_NUMBER")]
8484
#[override_opts(path = storage.max_concurrent_compaction_task_number)]
8585
pub max_concurrent_task_number: Option<u64>,
86+
87+
/// Enable async stack tracing through `await-tree` for risectl.
88+
#[clap(long, env = "RW_ASYNC_STACK_TRACE", value_enum)]
89+
#[override_opts(path = streaming.async_stack_trace)]
90+
pub async_stack_trace: Option<AsyncStackTraceOption>,
8691
}
8792

8893
use std::future::Future;

src/storage/compactor/src/rpc.rs

Lines changed: 49 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -12,8 +12,16 @@
1212
// See the License for the specific language governing permissions and
1313
// limitations under the License.
1414

15+
use std::collections::HashMap;
16+
use std::sync::Arc;
17+
18+
use parking_lot::RwLock;
1519
use risingwave_pb::compactor::compactor_service_server::CompactorService;
1620
use risingwave_pb::compactor::{EchoRequest, EchoResponse};
21+
use risingwave_pb::monitor_service::monitor_service_server::MonitorService;
22+
use risingwave_pb::monitor_service::{
23+
ProfilingRequest, ProfilingResponse, StackTraceRequest, StackTraceResponse,
24+
};
1725
use tonic::{Request, Response, Status};
1826

1927
#[derive(Default)]
@@ -25,3 +33,44 @@ impl CompactorService for CompactorServiceImpl {
2533
Ok(Response::new(EchoResponse {}))
2634
}
2735
}
36+
37+
pub struct MonitorServiceImpl {
38+
await_tree_reg: Option<Arc<RwLock<await_tree::Registry<String>>>>,
39+
}
40+
41+
impl MonitorServiceImpl {
42+
pub fn new(await_tree_reg: Option<Arc<RwLock<await_tree::Registry<String>>>>) -> Self {
43+
Self { await_tree_reg }
44+
}
45+
}
46+
47+
#[async_trait::async_trait]
48+
impl MonitorService for MonitorServiceImpl {
49+
async fn stack_trace(
50+
&self,
51+
_request: Request<StackTraceRequest>,
52+
) -> Result<Response<StackTraceResponse>, Status> {
53+
let compaction_task_traces = match &self.await_tree_reg {
54+
None => HashMap::default(),
55+
Some(await_tree_reg) => await_tree_reg
56+
.read()
57+
.iter()
58+
.map(|(k, v)| (k.to_string(), v.to_string()))
59+
.collect(),
60+
};
61+
Ok(Response::new(StackTraceResponse {
62+
actor_traces: Default::default(),
63+
rpc_traces: Default::default(),
64+
compaction_task_traces,
65+
}))
66+
}
67+
68+
async fn profiling(
69+
&self,
70+
_request: Request<ProfilingRequest>,
71+
) -> Result<Response<ProfilingResponse>, Status> {
72+
Err(Status::unimplemented(
73+
"profiling unimplemented in compactor",
74+
))
75+
}
76+
}

src/storage/compactor/src/server.rs

Lines changed: 20 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -16,7 +16,10 @@ use std::net::SocketAddr;
1616
use std::sync::Arc;
1717
use std::time::Duration;
1818

19-
use risingwave_common::config::{extract_storage_memory_config, load_config};
19+
use parking_lot::RwLock;
20+
use risingwave_common::config::{
21+
extract_storage_memory_config, load_config, AsyncStackTraceOption,
22+
};
2023
use risingwave_common::monitor::process_linux::monitor_process;
2124
use risingwave_common::system_param::local_manager::LocalSystemParamsManager;
2225
use risingwave_common::telemetry::manager::TelemetryManager;
@@ -29,6 +32,7 @@ use risingwave_common_service::observer_manager::ObserverManager;
2932
use risingwave_object_store::object::parse_remote_object_store;
3033
use risingwave_pb::common::WorkerType;
3134
use risingwave_pb::compactor::compactor_service_server::CompactorServiceServer;
35+
use risingwave_pb::monitor_service::monitor_service_server::MonitorServiceServer;
3236
use risingwave_rpc_client::MetaClient;
3337
use risingwave_storage::filter_key_extractor::{FilterKeyExtractorManager, RemoteTableAccessor};
3438
use risingwave_storage::hummock::compactor::{CompactionExecutor, CompactorContext};
@@ -45,7 +49,7 @@ use tokio::task::JoinHandle;
4549
use tracing::info;
4650

4751
use super::compactor_observer::observer_manager::CompactorObserverNode;
48-
use crate::rpc::CompactorServiceImpl;
52+
use crate::rpc::{CompactorServiceImpl, MonitorServiceImpl};
4953
use crate::telemetry::CompactorTelemetryCreator;
5054
use crate::CompactorOpts;
5155

@@ -173,6 +177,15 @@ pub async fn compactor_serve(
173177
hummock_meta_client.clone(),
174178
storage_opts.sstable_id_remote_fetch_number,
175179
));
180+
let await_tree_config = match &config.streaming.async_stack_trace {
181+
AsyncStackTraceOption::Off => None,
182+
c => await_tree::ConfigBuilder::default()
183+
.verbose(c.is_verbose().unwrap())
184+
.build()
185+
.ok(),
186+
};
187+
let await_tree_reg =
188+
await_tree_config.map(|c| Arc::new(RwLock::new(await_tree::Registry::new(c))));
176189
let compactor_context = Arc::new(CompactorContext {
177190
storage_opts,
178191
hummock_meta_client: hummock_meta_client.clone(),
@@ -186,6 +199,7 @@ pub async fn compactor_serve(
186199
output_memory_limiter,
187200
sstable_object_id_manager: sstable_object_id_manager.clone(),
188201
task_progress_manager: Default::default(),
202+
await_tree_reg: await_tree_reg.clone(),
189203
});
190204
let mut sub_tasks = vec![
191205
MetaClient::start_heartbeat_loop(
@@ -215,10 +229,13 @@ pub async fn compactor_serve(
215229
tracing::info!("Telemetry didn't start due to config");
216230
}
217231

232+
let compactor_srv = CompactorServiceImpl::default();
233+
let monitor_srv = MonitorServiceImpl::new(await_tree_reg);
218234
let (shutdown_send, mut shutdown_recv) = tokio::sync::oneshot::channel();
219235
let join_handle = tokio::spawn(async move {
220236
tonic::transport::Server::builder()
221-
.add_service(CompactorServiceServer::new(CompactorServiceImpl::default()))
237+
.add_service(CompactorServiceServer::new(compactor_srv))
238+
.add_service(MonitorServiceServer::new(monitor_srv))
222239
.serve_with_shutdown(listen_addr, async move {
223240
tokio::select! {
224241
_ = tokio::signal::ctrl_c() => {},

src/storage/hummock_test/src/compactor_tests.rs

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -191,6 +191,7 @@ pub(crate) mod tests {
191191
options.sstable_id_remote_fetch_number,
192192
)),
193193
task_progress_manager: Default::default(),
194+
await_tree_reg: None,
194195
}
195196
}
196197

src/storage/src/hummock/compactor/context.rs

Lines changed: 4 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -14,6 +14,7 @@
1414

1515
use std::sync::Arc;
1616

17+
use parking_lot::RwLock;
1718
use risingwave_rpc_client::HummockMetaClient;
1819

1920
use super::task_progress::TaskProgressManagerRef;
@@ -51,6 +52,8 @@ pub struct CompactorContext {
5152
pub sstable_object_id_manager: SstableObjectIdManagerRef,
5253

5354
pub task_progress_manager: TaskProgressManagerRef,
55+
56+
pub await_tree_reg: Option<Arc<RwLock<await_tree::Registry<String>>>>,
5457
}
5558

5659
impl CompactorContext {
@@ -83,6 +86,7 @@ impl CompactorContext {
8386
output_memory_limiter: memory_limiter,
8487
sstable_object_id_manager,
8588
task_progress_manager: Default::default(),
89+
await_tree_reg: None,
8690
}
8791
}
8892
}

src/storage/src/hummock/compactor/iterator.rs

Lines changed: 4 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -19,6 +19,7 @@ use std::sync::atomic::AtomicU64;
1919
use std::sync::{atomic, Arc};
2020
use std::time::Instant;
2121

22+
use await_tree::InstrumentAwait;
2223
use risingwave_hummock_sdk::compaction_group::StateTableId;
2324
use risingwave_hummock_sdk::key::FullKey;
2425
use risingwave_hummock_sdk::key_range::KeyRange;
@@ -127,7 +128,7 @@ impl SstableStreamIterator {
127128
/// `self.block_iter` to `None`.
128129
async fn next_block(&mut self) -> HummockResult<()> {
129130
// Check if we want and if we can load the next block.
130-
if self.remaining_blocks > 0 && let Some(block) = self.download_next_block().await? {
131+
if self.remaining_blocks > 0 && let Some(block) = self.download_next_block().verbose_instrument_await("stream_iter_next_block").await? {
131132
let mut block_iter = BlockIterator::new(BlockHolder::from_owned_block(block));
132133
block_iter.seek_to_first();
133134

@@ -280,6 +281,7 @@ impl ConcatSstableIterator {
280281
let sstable = self
281282
.sstable_store
282283
.sstable(table_info, &mut self.stats)
284+
.verbose_instrument_await("stream_iter_sstable")
283285
.await?;
284286
let stats_ptr = self.stats.remote_io_time.clone();
285287
let now = Instant::now();
@@ -323,6 +325,7 @@ impl ConcatSstableIterator {
323325
let block_stream = self
324326
.sstable_store
325327
.get_stream(sstable.value(), Some(start_index))
328+
.verbose_instrument_await("stream_iter_get_stream")
326329
.await?;
327330

328331
// Determine time needed to open stream.

0 commit comments

Comments
 (0)