Skip to content

feat(trace): enable await tree trace for compactor #10381

New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Merged
merged 2 commits into from
Jun 18, 2023
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
2 changes: 2 additions & 0 deletions Cargo.lock

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

1 change: 1 addition & 0 deletions proto/monitor_service.proto
Original file line number Diff line number Diff line change
Expand Up @@ -10,6 +10,7 @@ message StackTraceRequest {}
message StackTraceResponse {
map<uint32, string> actor_traces = 1;
map<string, string> rpc_traces = 2;
map<string, string> compaction_task_traces = 3;
}

message ProfilingRequest {
Expand Down
1 change: 1 addition & 0 deletions src/compute/src/rpc/service/monitor_service.rs
Original file line number Diff line number Diff line change
Expand Up @@ -70,6 +70,7 @@ impl MonitorService for MonitorServiceImpl {
Ok(Response::new(StackTraceResponse {
actor_traces,
rpc_traces,
compaction_task_traces: Default::default(),
}))
}

Expand Down
1 change: 1 addition & 0 deletions src/compute/src/server.rs
Original file line number Diff line number Diff line change
Expand Up @@ -224,6 +224,7 @@ pub async fn compute_node_serve(
output_memory_limiter,
sstable_object_id_manager: storage.sstable_object_id_manager().clone(),
task_progress_manager: Default::default(),
await_tree_reg: None,
});

let (handle, shutdown_sender) =
Expand Down
21 changes: 20 additions & 1 deletion src/ctl/src/cmd_impl/trace.rs
Original file line number Diff line number Diff line change
Expand Up @@ -17,7 +17,7 @@ use std::collections::BTreeMap;
use risingwave_common::util::addr::HostAddr;
use risingwave_pb::common::WorkerType;
use risingwave_pb::monitor_service::StackTraceResponse;
use risingwave_rpc_client::ComputeClientPool;
use risingwave_rpc_client::{CompactorClient, ComputeClientPool};

use crate::CtlContext;

Expand All @@ -41,6 +41,7 @@ pub async fn trace(context: &CtlContext) -> anyhow::Result<()> {
let StackTraceResponse {
actor_traces,
rpc_traces,
..
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Will this PR also enable shared buffer compaction traces in CN? If yes, we can also include them here and display the traces via risectl

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Will this PR also enable shared buffer compaction traces in CN

No. Shall we?

Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Will this PR also enable shared buffer compaction traces in CN

No. Shall we?

+1. But since the performance overhead is unkonwn and shared buffer compaction may have bigger impact on streaming performance, we can leave it to a separate PR and verify whether the overhead is acceptable like #10144.

} = client.stack_trace().await?;

all_actor_traces.extend(actor_traces);
Expand All @@ -65,5 +66,23 @@ pub async fn trace(context: &CtlContext) -> anyhow::Result<()> {
}
}

let compactor_nodes = meta_client.list_worker_nodes(WorkerType::Compactor).await?;
let mut all_compaction_task_traces = BTreeMap::new();
for compactor in compactor_nodes {
let addr: HostAddr = compactor.get_host().unwrap().into();
let client = CompactorClient::new(addr).await?;
let StackTraceResponse {
compaction_task_traces,
..
} = client.stack_trace().await?;
all_compaction_task_traces.extend(compaction_task_traces);
}
if !all_compaction_task_traces.is_empty() {
println!("--- Compactor Traces ---");
for (key, trace) in all_compaction_task_traces {
println!(">> Compaction Task {key}\n{trace}");
}
}

Ok(())
}
48 changes: 48 additions & 0 deletions src/rpc_client/src/compactor_client.rs
Original file line number Diff line number Diff line change
@@ -0,0 +1,48 @@
// Copyright 2023 RisingWave Labs
//
// Licensed under the Apache License, Version 2.0 (the "License");
// you may not use this file except in compliance with the License.
// You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing, software
// distributed under the License is distributed on an "AS IS" BASIS,
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
// See the License for the specific language governing permissions and
// limitations under the License.

use std::time::Duration;

use risingwave_common::util::addr::HostAddr;
use risingwave_pb::monitor_service::monitor_service_client::MonitorServiceClient;
use risingwave_pb::monitor_service::{StackTraceRequest, StackTraceResponse};
use tonic::transport::{Channel, Endpoint};

use crate::error::Result;

#[derive(Clone)]
pub struct CompactorClient {
pub monitor_client: MonitorServiceClient<Channel>,
}

impl CompactorClient {
pub async fn new(host_addr: HostAddr) -> Result<Self> {
let channel = Endpoint::from_shared(format!("http://{}", &host_addr))?
.connect_timeout(Duration::from_secs(5))
.connect()
.await?;
Ok(Self {
monitor_client: MonitorServiceClient::new(channel),
})
}

pub async fn stack_trace(&self) -> Result<StackTraceResponse> {
Ok(self
.monitor_client
.to_owned()
.stack_trace(StackTraceRequest::default())
.await?
.into_inner())
}
}
2 changes: 2 additions & 0 deletions src/rpc_client/src/lib.rs
Original file line number Diff line number Diff line change
Expand Up @@ -50,8 +50,10 @@ mod connector_client;
mod hummock_meta_client;
mod meta_client;
// mod sink_client;
mod compactor_client;
mod stream_client;

pub use compactor_client::CompactorClient;
pub use compute_client::{ComputeClient, ComputeClientPool, ComputeClientPoolRef};
pub use connector_client::ConnectorClient;
pub use hummock_meta_client::{CompactTaskItem, HummockMetaClient};
Expand Down
2 changes: 2 additions & 0 deletions src/storage/compactor/Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -17,7 +17,9 @@ normal = ["workspace-hack"]
[dependencies]
anyhow = "1"
async-trait = "0.1"
await-tree = { workspace = true }
clap = { version = "4", features = ["derive"] }
parking_lot = "0.12"
prometheus = { version = "0.13" }
risingwave_common = { path = "../../common" }
risingwave_common_service = { path = "../../common/common_service" }
Expand Down
7 changes: 6 additions & 1 deletion src/storage/compactor/src/lib.rs
Original file line number Diff line number Diff line change
Expand Up @@ -18,7 +18,7 @@ mod server;
mod telemetry;

use clap::Parser;
use risingwave_common::config::OverrideConfig;
use risingwave_common::config::{AsyncStackTraceOption, OverrideConfig};

use crate::server::compactor_serve;

Expand Down Expand Up @@ -83,6 +83,11 @@ struct OverrideConfigOpts {
#[clap(long, env = "RW_MAX_CONCURRENT_TASK_NUMBER")]
#[override_opts(path = storage.max_concurrent_compaction_task_number)]
pub max_concurrent_task_number: Option<u64>,

/// Enable async stack tracing through `await-tree` for risectl.
#[clap(long, env = "RW_ASYNC_STACK_TRACE", value_enum)]
#[override_opts(path = streaming.async_stack_trace)]
pub async_stack_trace: Option<AsyncStackTraceOption>,
}

use std::future::Future;
Expand Down
49 changes: 49 additions & 0 deletions src/storage/compactor/src/rpc.rs
Original file line number Diff line number Diff line change
Expand Up @@ -12,8 +12,16 @@
// See the License for the specific language governing permissions and
// limitations under the License.

use std::collections::HashMap;
use std::sync::Arc;

use parking_lot::RwLock;
use risingwave_pb::compactor::compactor_service_server::CompactorService;
use risingwave_pb::compactor::{EchoRequest, EchoResponse};
use risingwave_pb::monitor_service::monitor_service_server::MonitorService;
use risingwave_pb::monitor_service::{
ProfilingRequest, ProfilingResponse, StackTraceRequest, StackTraceResponse,
};
use tonic::{Request, Response, Status};

#[derive(Default)]
Expand All @@ -25,3 +33,44 @@ impl CompactorService for CompactorServiceImpl {
Ok(Response::new(EchoResponse {}))
}
}

pub struct MonitorServiceImpl {
await_tree_reg: Option<Arc<RwLock<await_tree::Registry<String>>>>,
}

impl MonitorServiceImpl {
pub fn new(await_tree_reg: Option<Arc<RwLock<await_tree::Registry<String>>>>) -> Self {
Self { await_tree_reg }
}
}

#[async_trait::async_trait]
impl MonitorService for MonitorServiceImpl {
async fn stack_trace(
&self,
_request: Request<StackTraceRequest>,
) -> Result<Response<StackTraceResponse>, Status> {
let compaction_task_traces = match &self.await_tree_reg {
None => HashMap::default(),
Some(await_tree_reg) => await_tree_reg
.read()
.iter()
.map(|(k, v)| (k.to_string(), v.to_string()))
.collect(),
};
Ok(Response::new(StackTraceResponse {
actor_traces: Default::default(),
rpc_traces: Default::default(),
compaction_task_traces,
}))
}

async fn profiling(
&self,
_request: Request<ProfilingRequest>,
) -> Result<Response<ProfilingResponse>, Status> {
Err(Status::unimplemented(
"profiling unimplemented in compactor",
))
}
}
23 changes: 20 additions & 3 deletions src/storage/compactor/src/server.rs
Original file line number Diff line number Diff line change
Expand Up @@ -16,7 +16,10 @@ use std::net::SocketAddr;
use std::sync::Arc;
use std::time::Duration;

use risingwave_common::config::{extract_storage_memory_config, load_config};
use parking_lot::RwLock;
use risingwave_common::config::{
extract_storage_memory_config, load_config, AsyncStackTraceOption,
};
use risingwave_common::monitor::process_linux::monitor_process;
use risingwave_common::system_param::local_manager::LocalSystemParamsManager;
use risingwave_common::telemetry::manager::TelemetryManager;
Expand All @@ -29,6 +32,7 @@ use risingwave_common_service::observer_manager::ObserverManager;
use risingwave_object_store::object::parse_remote_object_store;
use risingwave_pb::common::WorkerType;
use risingwave_pb::compactor::compactor_service_server::CompactorServiceServer;
use risingwave_pb::monitor_service::monitor_service_server::MonitorServiceServer;
use risingwave_rpc_client::MetaClient;
use risingwave_storage::filter_key_extractor::{FilterKeyExtractorManager, RemoteTableAccessor};
use risingwave_storage::hummock::compactor::{CompactionExecutor, CompactorContext};
Expand All @@ -45,7 +49,7 @@ use tokio::task::JoinHandle;
use tracing::info;

use super::compactor_observer::observer_manager::CompactorObserverNode;
use crate::rpc::CompactorServiceImpl;
use crate::rpc::{CompactorServiceImpl, MonitorServiceImpl};
use crate::telemetry::CompactorTelemetryCreator;
use crate::CompactorOpts;

Expand Down Expand Up @@ -174,6 +178,15 @@ pub async fn compactor_serve(
hummock_meta_client.clone(),
storage_opts.sstable_id_remote_fetch_number,
));
let await_tree_config = match &config.streaming.async_stack_trace {
AsyncStackTraceOption::Off => None,
c => await_tree::ConfigBuilder::default()
.verbose(c.is_verbose().unwrap())
.build()
.ok(),
};
let await_tree_reg =
await_tree_config.map(|c| Arc::new(RwLock::new(await_tree::Registry::new(c))));
let compactor_context = Arc::new(CompactorContext {
storage_opts,
hummock_meta_client: hummock_meta_client.clone(),
Expand All @@ -187,6 +200,7 @@ pub async fn compactor_serve(
output_memory_limiter,
sstable_object_id_manager: sstable_object_id_manager.clone(),
task_progress_manager: Default::default(),
await_tree_reg: await_tree_reg.clone(),
});
let mut sub_tasks = vec![
MetaClient::start_heartbeat_loop(
Expand Down Expand Up @@ -216,10 +230,13 @@ pub async fn compactor_serve(
tracing::info!("Telemetry didn't start due to config");
}

let compactor_srv = CompactorServiceImpl::default();
let monitor_srv = MonitorServiceImpl::new(await_tree_reg);
let (shutdown_send, mut shutdown_recv) = tokio::sync::oneshot::channel();
let join_handle = tokio::spawn(async move {
tonic::transport::Server::builder()
.add_service(CompactorServiceServer::new(CompactorServiceImpl::default()))
.add_service(CompactorServiceServer::new(compactor_srv))
.add_service(MonitorServiceServer::new(monitor_srv))
.serve_with_shutdown(listen_addr, async move {
tokio::select! {
_ = tokio::signal::ctrl_c() => {},
Expand Down
1 change: 1 addition & 0 deletions src/storage/hummock_test/src/compactor_tests.rs
Original file line number Diff line number Diff line change
Expand Up @@ -191,6 +191,7 @@ pub(crate) mod tests {
options.sstable_id_remote_fetch_number,
)),
task_progress_manager: Default::default(),
await_tree_reg: None,
}
}

Expand Down
4 changes: 4 additions & 0 deletions src/storage/src/hummock/compactor/context.rs
Original file line number Diff line number Diff line change
Expand Up @@ -14,6 +14,7 @@

use std::sync::Arc;

use parking_lot::RwLock;
use risingwave_rpc_client::HummockMetaClient;

use super::task_progress::TaskProgressManagerRef;
Expand Down Expand Up @@ -51,6 +52,8 @@ pub struct CompactorContext {
pub sstable_object_id_manager: SstableObjectIdManagerRef,

pub task_progress_manager: TaskProgressManagerRef,

pub await_tree_reg: Option<Arc<RwLock<await_tree::Registry<String>>>>,
}

impl CompactorContext {
Expand Down Expand Up @@ -83,6 +86,7 @@ impl CompactorContext {
output_memory_limiter: memory_limiter,
sstable_object_id_manager,
task_progress_manager: Default::default(),
await_tree_reg: None,
}
}
}
5 changes: 4 additions & 1 deletion src/storage/src/hummock/compactor/iterator.rs
Original file line number Diff line number Diff line change
Expand Up @@ -19,6 +19,7 @@ use std::sync::atomic::AtomicU64;
use std::sync::{atomic, Arc};
use std::time::Instant;

use await_tree::InstrumentAwait;
use risingwave_hummock_sdk::compaction_group::StateTableId;
use risingwave_hummock_sdk::key::FullKey;
use risingwave_hummock_sdk::key_range::KeyRange;
Expand Down Expand Up @@ -127,7 +128,7 @@ impl SstableStreamIterator {
/// `self.block_iter` to `None`.
async fn next_block(&mut self) -> HummockResult<()> {
// Check if we want and if we can load the next block.
if self.remaining_blocks > 0 && let Some(block) = self.download_next_block().await? {
if self.remaining_blocks > 0 && let Some(block) = self.download_next_block().verbose_instrument_await("stream_iter_next_block").await? {
let mut block_iter = BlockIterator::new(BlockHolder::from_owned_block(block));
block_iter.seek_to_first();

Expand Down Expand Up @@ -280,6 +281,7 @@ impl ConcatSstableIterator {
let sstable = self
.sstable_store
.sstable(table_info, &mut self.stats)
.verbose_instrument_await("stream_iter_sstable")
.await?;
let stats_ptr = self.stats.remote_io_time.clone();
let now = Instant::now();
Expand Down Expand Up @@ -323,6 +325,7 @@ impl ConcatSstableIterator {
let block_stream = self
.sstable_store
.get_stream(sstable.value(), Some(start_index))
.verbose_instrument_await("stream_iter_get_stream")
.await?;

// Determine time needed to open stream.
Expand Down
Loading