diff --git a/Cargo.lock b/Cargo.lock index 4ccc8a0f3b0e0..eaa53fe0594ce 100644 --- a/Cargo.lock +++ b/Cargo.lock @@ -6053,6 +6053,7 @@ dependencies = [ "arrow-schema", "async-trait", "auto_enums", + "await-tree", "chrono", "chrono-tz", "criterion", diff --git a/Cargo.toml b/Cargo.toml index cea9fe2f42691..b0d3b825893ba 100644 --- a/Cargo.toml +++ b/Cargo.toml @@ -58,6 +58,7 @@ license = "Apache-2.0" repository = "https://github.com/risingwavelabs/risingwave" [workspace.dependencies] +await-tree = "0.1.1" aws-config = { version = "0.51", default-features = false, features = ["rt-tokio", "native-tls"] } aws-sdk-kinesis = { version = "0.21", default-features = false, features = ["rt-tokio", "native-tls"] } aws-sdk-s3 = { version = "0.21", default-features = false, features = ["rt-tokio","native-tls"] } diff --git a/src/common/src/config.rs b/src/common/src/config.rs index 8893fc81f220b..b03a3ae76f1af 100644 --- a/src/common/src/config.rs +++ b/src/common/src/config.rs @@ -420,12 +420,27 @@ pub struct FileCacheConfig { pub unrecognized: Unrecognized, } -#[derive(Debug, Default, Clone, ValueEnum, Serialize, Deserialize)] +#[derive(Debug, Default, Clone, Copy, ValueEnum, Serialize, Deserialize)] pub enum AsyncStackTraceOption { + /// Disabled. Off, - #[default] + /// Enabled with basic instruments. On, - Verbose, + /// Enabled with extra verbose instruments in release build. + /// Behaves the same as `on` in debug build due to performance concern. + #[default] + #[clap(alias = "verbose")] + ReleaseVerbose, +} + +impl AsyncStackTraceOption { + pub fn is_verbose(self) -> Option { + match self { + Self::Off => None, + Self::On => Some(false), + Self::ReleaseVerbose => Some(!cfg!(debug_assertions)), + } + } } serde_with::with_prefix!(streaming_prefix "stream_"); @@ -723,7 +738,7 @@ mod default { } pub fn async_stack_trace() -> AsyncStackTraceOption { - AsyncStackTraceOption::On + AsyncStackTraceOption::default() } pub fn unique_user_stream_errors() -> usize { diff --git a/src/compute/Cargo.toml b/src/compute/Cargo.toml index d6c034af8e820..f2299d66e4f9d 100644 --- a/src/compute/Cargo.toml +++ b/src/compute/Cargo.toml @@ -16,7 +16,7 @@ normal = ["workspace-hack"] [dependencies] anyhow = "1" async-trait = "0.1" -await-tree = "0.1.1" +await-tree = { workspace = true } clap = { version = "4", features = ["derive"] } either = "1" futures = { version = "0.3", default-features = false, features = ["alloc"] } diff --git a/src/compute/src/server.rs b/src/compute/src/server.rs index 889138c6239f4..200a3c60afa77 100644 --- a/src/compute/src/server.rs +++ b/src/compute/src/server.rs @@ -263,7 +263,7 @@ pub async fn compute_node_serve( let await_tree_config = match &config.streaming.async_stack_trace { AsyncStackTraceOption::Off => None, c => await_tree::ConfigBuilder::default() - .verbose(matches!(c, AsyncStackTraceOption::Verbose)) + .verbose(c.is_verbose().unwrap()) .build() .ok(), }; diff --git a/src/config/example.toml b/src/config/example.toml index 73ecb07755dc9..378435ba20c9f 100644 --- a/src/config/example.toml +++ b/src/config/example.toml @@ -37,7 +37,7 @@ batch_chunk_size = 1024 [streaming] in_flight_barrier_nums = 10000 enable_jaeger_tracing = false -async_stack_trace = "On" +async_stack_trace = "ReleaseVerbose" unique_user_stream_errors = 10 [streaming.developer] diff --git a/src/expr/Cargo.toml b/src/expr/Cargo.toml index 83ff4e28d3255..5ddc587b25361 100644 --- a/src/expr/Cargo.toml +++ b/src/expr/Cargo.toml @@ -21,6 +21,7 @@ arrow-array = "36" arrow-schema = "36" async-trait = "0.1" auto_enums = "0.8" +await-tree = { workspace = true } chrono = { version = "0.4", default-features = false, features = ["clock", "std"] } chrono-tz = { version = "0.7", features = ["case-insensitive"] } ctor = "0.1" diff --git a/src/expr/src/expr/expr_udf.rs b/src/expr/src/expr/expr_udf.rs index 8c5cdbb1351da..14c7f46b4341a 100644 --- a/src/expr/src/expr/expr_udf.rs +++ b/src/expr/src/expr/expr_udf.rs @@ -17,6 +17,7 @@ use std::convert::TryFrom; use std::sync::{Arc, LazyLock, Mutex, Weak}; use arrow_schema::{Field, Schema, SchemaRef}; +use await_tree::InstrumentAwait; use risingwave_common::array::{ArrayImpl, ArrayRef, DataChunk}; use risingwave_common::row::OwnedRow; use risingwave_common::types::{DataType, Datum}; @@ -35,6 +36,7 @@ pub struct UdfExpression { arg_schema: SchemaRef, client: Arc, identifier: String, + span: await_tree::Span, } #[cfg(not(madsim))] @@ -80,7 +82,11 @@ impl UdfExpression { let input = arrow_array::RecordBatch::try_new_with_options(self.arg_schema.clone(), columns, &opts) .expect("failed to build record batch"); - let output = self.client.call(&self.identifier, input).await?; + let output = self + .client + .call(&self.identifier, input) + .instrument_await(self.span.clone()) + .await?; if output.num_rows() != vis.len() { bail!( "UDF returned {} rows, but expected {}", @@ -121,6 +127,7 @@ impl<'a> TryFrom<&'a ExprNode> for UdfExpression { arg_schema, client, identifier: udf.identifier.clone(), + span: format!("expr_udf_call ({})", udf.identifier).into(), }) } } diff --git a/src/object_store/Cargo.toml b/src/object_store/Cargo.toml index 259c56fb3459e..b6a9f04061661 100644 --- a/src/object_store/Cargo.toml +++ b/src/object_store/Cargo.toml @@ -10,7 +10,7 @@ repository = { workspace = true } [dependencies] async-trait = "0.1" -await-tree = "0.1.1" +await-tree = { workspace = true } aws-config = { workspace = true } aws-sdk-s3 = { version = "0.2.15", package = "madsim-aws-sdk-s3" } aws-smithy-http = { workspace = true } diff --git a/src/storage/Cargo.toml b/src/storage/Cargo.toml index 02accf4cd85b6..b18f5f826876c 100644 --- a/src/storage/Cargo.toml +++ b/src/storage/Cargo.toml @@ -17,7 +17,7 @@ normal = ["workspace-hack"] arc-swap = "1" async-trait = "0.1" auto_enums = { version = "0.8", features = ["futures03"] } -await-tree = "0.1.1" +await-tree = { workspace = true } bytes = { version = "1", features = ["serde"] } crossbeam = "0.8.1" dashmap = { version = "5", default-features = false } diff --git a/src/storage/src/monitor/monitored_store.rs b/src/storage/src/monitor/monitored_store.rs index 580a4b8a78681..ba364509f0fbb 100644 --- a/src/storage/src/monitor/monitored_store.rs +++ b/src/storage/src/monitor/monitored_store.rs @@ -185,7 +185,11 @@ impl LocalStateStore for MonitoredStateStore { .may_exist_duration .with_label_values(&[table_id_label.as_str()]) .start_timer(); - let res = self.inner.may_exist(key_range, read_options).await; + let res = self + .inner + .may_exist(key_range, read_options) + .verbose_instrument_await("store_may_exist") + .await; timer.observe_duration(); res } @@ -217,7 +221,9 @@ impl LocalStateStore for MonitoredStateStore { fn flush(&mut self, delete_ranges: Vec<(Bound, Bound)>) -> Self::FlushFuture<'_> { // TODO: collect metrics - self.inner.flush(delete_ranges) + self.inner + .flush(delete_ranges) + .verbose_instrument_await("store_flush") } fn epoch(&self) -> u64 { @@ -264,7 +270,7 @@ impl StateStore for MonitoredStateStore { let sync_result = self .inner .sync(epoch) - .instrument_await("store_await_sync") + .instrument_await("store_sync") .await .inspect_err(|e| error!("Failed in sync: {:?}", e))?; timer.observe_duration(); diff --git a/src/stream/Cargo.toml b/src/stream/Cargo.toml index 4c46f33c55458..f25712ba65e4a 100644 --- a/src/stream/Cargo.toml +++ b/src/stream/Cargo.toml @@ -19,7 +19,7 @@ anyhow = "1" async-recursion = "1" async-stream = "0.3" async-trait = "0.1" -await-tree = "0.1.1" +await-tree = { workspace = true } bytes = "1" dyn-clone = "1" educe = "0.4" diff --git a/src/utils/runtime/Cargo.toml b/src/utils/runtime/Cargo.toml index 8c620e2b17c35..b5e7a013ca459 100644 --- a/src/utils/runtime/Cargo.toml +++ b/src/utils/runtime/Cargo.toml @@ -15,7 +15,7 @@ ignored = ["workspace-hack"] normal = ["workspace-hack"] [dependencies] -await-tree = "0.1.1" +await-tree = { workspace = true } console = "0.15" console-subscriber = "0.1.8" futures = { version = "0.3", default-features = false, features = ["alloc"] }