Skip to content

refactor(metrics): support streaming metrics level #11994

New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Merged
merged 13 commits into from
Sep 5, 2023
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
2 changes: 1 addition & 1 deletion docker/dashboards/risingwave-dev-dashboard.json

Large diffs are not rendered by default.

16 changes: 8 additions & 8 deletions grafana/risingwave-dev-dashboard.dashboard.py
Original file line number Diff line number Diff line change
Expand Up @@ -1071,14 +1071,14 @@ def section_streaming_actors(outer_panels):
[
*quantile(
lambda quantile, legend: panels.target(
f"histogram_quantile({quantile}, sum(rate({metric('stream_join_barrier_align_duration_bucket')}[$__rate_interval])) by (le, actor_id, wait_side, job, instance))",
f"p{legend} {{{{actor_id}}}}.{{{{wait_side}}}} - {{{{job}}}} @ {{{{instance}}}}",
f"histogram_quantile({quantile}, sum(rate({metric('stream_join_barrier_align_duration_bucket')}[$__rate_interval])) by (le, fragment_id, wait_side, job, instance))",
f"p{legend} - fragment {{{{fragment_id}}}} {{{{wait_side}}}} - {{{{job}}}} @ {{{{instance}}}}",
),
[90, 99, 999, "max"],
),
panels.target(
f"sum by(le, actor_id, wait_side, job, instance)(rate({metric('stream_join_barrier_align_duration_sum')}[$__rate_interval])) / sum by(le,actor_id,wait_side,job,instance) (rate({metric('stream_join_barrier_align_duration_count')}[$__rate_interval]))",
"avg {{actor_id}}.{{wait_side}} - {{job}} @ {{instance}}",
f"sum by(le, fragment_id, wait_side, job, instance)(rate({metric('stream_join_barrier_align_duration_sum')}[$__rate_interval])) / sum by(le,fragment_id,wait_side,job,instance) (rate({metric('stream_join_barrier_align_duration_count')}[$__rate_interval]))",
"avg - fragment {{fragment_id}} {{wait_side}} - {{job}} @ {{instance}}",
),
],
),
Expand Down Expand Up @@ -1135,14 +1135,14 @@ def section_streaming_actors(outer_panels):
[
*quantile(
lambda quantile, legend: panels.target(
f"histogram_quantile({quantile}, sum(rate({metric('stream_join_matched_join_keys_bucket')}[$__rate_interval])) by (le, actor_id, table_id, job, instance))",
f"p{legend} - actor_id {{{{actor_id}}}} table_id {{{{table_id}}}} - {{{{job}}}} @ {{{{instance}}}}",
f"histogram_quantile({quantile}, sum(rate({metric('stream_join_matched_join_keys_bucket')}[$__rate_interval])) by (le, fragment_id, table_id, job, instance))",
f"p{legend} - fragment {{{{fragment_id}}}} table_id {{{{table_id}}}} - {{{{job}}}} @ {{{{instance}}}}",
),
[90, 99, "max"],
),
panels.target(
f"sum by(le, job, instance, actor_id, table_id) (rate({metric('stream_join_matched_join_keys_sum')}[$__rate_interval])) / sum by(le, job, instance, actor_id, table_id) (rate({table_metric('stream_join_matched_join_keys_count')}[$__rate_interval]))",
"avg - actor_id {{actor_id}} table_id {{table_id}} - {{job}} @ {{instance}}",
f"sum by(le, job, instance, actor_id, table_id) (rate({metric('stream_join_matched_join_keys_sum')}[$__rate_interval])) / sum by(le, job, instance, fragment_id, table_id) (rate({table_metric('stream_join_matched_join_keys_count')}[$__rate_interval]))",
"avg - fragment {{fragment_id}} table_id {{table_id}} - {{job}} @ {{instance}}",
),
],
),
Expand Down
2 changes: 1 addition & 1 deletion grafana/risingwave-dev-dashboard.json

Large diffs are not rendered by default.

23 changes: 15 additions & 8 deletions src/common/src/config.rs
Original file line number Diff line number Diff line change
Expand Up @@ -420,26 +420,29 @@ pub struct StreamingConfig {
#[serde(default = "default::streaming::unique_user_stream_errors")]
pub unique_user_stream_errors: usize,

#[serde(default = "default::streaming::streaming_metric_level")]
pub streaming_metric_level: MetricLevel,

#[serde(default, flatten)]
pub unrecognized: Unrecognized<Self>,
}

#[derive(Debug, Default, Clone, Copy, ValueEnum, Serialize, Deserialize)]
pub enum StorageMetricLevel {
pub enum MetricLevel {
#[default]
Disabled = 0,
Critical = 1,
Info = 2,
Debug = 3,
}

impl PartialEq<Self> for StorageMetricLevel {
impl PartialEq<Self> for MetricLevel {
fn eq(&self, other: &Self) -> bool {
(*self as u8).eq(&(*other as u8))
}
}

impl PartialOrd for StorageMetricLevel {
impl PartialOrd for MetricLevel {
fn partial_cmp(&self, other: &Self) -> Option<Ordering> {
(*self as u8).partial_cmp(&(*other as u8))
}
Expand Down Expand Up @@ -567,7 +570,7 @@ pub struct StorageConfig {
pub compactor_max_sst_size: u64,

#[serde(default = "default::storage::storage_metric_level")]
pub storage_metric_level: StorageMetricLevel,
pub storage_metric_level: MetricLevel,

#[serde(default, flatten)]
pub unrecognized: Unrecognized<Self>,
Expand Down Expand Up @@ -938,7 +941,7 @@ pub mod default {
}

pub mod storage {
use crate::config::StorageMetricLevel;
use crate::config::MetricLevel;

pub fn share_buffers_sync_parallelism() -> u32 {
1
Expand Down Expand Up @@ -1045,13 +1048,13 @@ pub mod default {
512 * 1024 * 1024 // 512m
}

pub fn storage_metric_level() -> StorageMetricLevel {
StorageMetricLevel::Info
pub fn storage_metric_level() -> MetricLevel {
MetricLevel::Info
}
}

pub mod streaming {
use crate::config::AsyncStackTraceOption;
use crate::config::{AsyncStackTraceOption, MetricLevel};

pub fn in_flight_barrier_nums() -> usize {
// quick fix
Expand All @@ -1066,6 +1069,10 @@ pub mod default {
pub fn unique_user_stream_errors() -> usize {
10
}

pub fn streaming_metric_level() -> MetricLevel {
MetricLevel::Info
}
}

pub mod file_cache {
Expand Down
3 changes: 3 additions & 0 deletions src/common/src/metrics.rs
Original file line number Diff line number Diff line change
Expand Up @@ -25,6 +25,9 @@ use tracing_subscriber::Layer;

use crate::monitor::GLOBAL_METRICS_REGISTRY;

mod relabeled_metric;
pub use relabeled_metric::*;

#[derive(Debug)]
pub struct TrAdderAtomic(TrAdder<i64>);

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -14,7 +14,8 @@

use prometheus::core::{AtomicU64, GenericCounter, GenericCounterVec};
use prometheus::{Histogram, HistogramVec};
use risingwave_common::config::StorageMetricLevel;

use crate::config::MetricLevel;

/// For all `Relabeled*Vec` below,
/// - when `metric_level` <= `relabel_threshold`, they behaves exactly the same as their inner
Expand All @@ -26,58 +27,105 @@ use risingwave_common::config::StorageMetricLevel;
/// We could have use one single struct to represent all `MetricVec<T: MetricVecBuilder>`, rather
/// than specializing them one by one. However, that's undoable because prometheus crate doesn't
/// export `MetricVecBuilder` implementation like `HistogramVecBuilder`.

#[derive(Clone, Debug)]
pub struct RelabeledHistogramVec {
relabel_threshold: StorageMetricLevel,
metric_level: StorageMetricLevel,
relabel_threshold: MetricLevel,
metric_level: MetricLevel,
metric: HistogramVec,

/// The first `relabel_num` labels will be relabeled to empty string
///
/// For example, if `relabel_num` is 1, and the input labels are `["actor_id",
/// "fragment_id", "table_id"]`, when threshold is reached, the label values will be
/// `["", "<original_fragment_id>", "<original_table_id>"]`.
relabel_num: usize,
}

impl RelabeledHistogramVec {
pub fn with_metric_level(
metric_level: StorageMetricLevel,
metric_level: MetricLevel,
metric: HistogramVec,
relabel_threshold: StorageMetricLevel,
relabel_threshold: MetricLevel,
) -> Self {
Self {
relabel_threshold,
metric_level,
metric,
relabel_num: usize::MAX,
}
}

pub fn with_metric_level_relabel_n(
metric_level: MetricLevel,
metric: HistogramVec,
relabel_threshold: MetricLevel,
relabel_num: usize,
) -> Self {
Self {
relabel_threshold,
metric_level,
metric,
relabel_num,
}
}

pub fn with_label_values(&self, vals: &[&str]) -> Histogram {
if self.metric_level > self.relabel_threshold {
return self.metric.with_label_values(&vec![""; vals.len()]);
// relabel first n labels to empty string
let mut relabeled_vals = vals.to_vec();
for label in relabeled_vals.iter_mut().take(self.relabel_num) {
*label = "";
}
self.metric.with_label_values(&relabeled_vals);
}
self.metric.with_label_values(vals)
}
}

#[derive(Clone, Debug)]
pub struct RelabeledCounterVec {
relabel_threshold: StorageMetricLevel,
metric_level: StorageMetricLevel,
relabel_threshold: MetricLevel,
metric_level: MetricLevel,
metric: GenericCounterVec<AtomicU64>,
relabel_num: usize,
}

impl RelabeledCounterVec {
pub fn with_metric_level(
metric_level: StorageMetricLevel,
metric_level: MetricLevel,
metric: GenericCounterVec<AtomicU64>,
relabel_threshold: MetricLevel,
) -> Self {
Self {
relabel_threshold,
metric_level,
metric,
relabel_num: usize::MAX,
}
}

pub fn with_metric_level_relabel_n(
metric_level: MetricLevel,
metric: GenericCounterVec<AtomicU64>,
relabel_threshold: StorageMetricLevel,
relabel_threshold: MetricLevel,
relabel_num: usize,
) -> Self {
Self {
relabel_threshold,
metric_level,
metric,
relabel_num,
}
}

pub fn with_label_values(&self, vals: &[&str]) -> GenericCounter<AtomicU64> {
if self.metric_level > self.relabel_threshold {
return self.metric.with_label_values(&vec![""; vals.len()]);
// relabel first n labels to empty string
let mut relabeled_vals = vals.to_vec();
for label in relabeled_vals.iter_mut().take(self.relabel_num) {
*label = "";
}
self.metric.with_label_values(&relabeled_vals);
}
self.metric.with_label_values(vals)
}
Expand Down
6 changes: 4 additions & 2 deletions src/compute/src/server.rs
Original file line number Diff line number Diff line change
Expand Up @@ -59,7 +59,7 @@ use risingwave_storage::monitor::{
};
use risingwave_storage::opts::StorageOpts;
use risingwave_storage::StateStoreImpl;
use risingwave_stream::executor::monitor::GLOBAL_STREAMING_METRICS;
use risingwave_stream::executor::monitor::global_streaming_metrics;
use risingwave_stream::task::{LocalStreamManager, StreamEnvironment};
use tokio::sync::oneshot::Sender;
use tokio::task::JoinHandle;
Expand Down Expand Up @@ -169,7 +169,9 @@ pub async fn compute_node_serve(
// Initialize the metrics subsystem.
let source_metrics = Arc::new(GLOBAL_SOURCE_METRICS.clone());
let hummock_metrics = Arc::new(GLOBAL_HUMMOCK_METRICS.clone());
let streaming_metrics = Arc::new(GLOBAL_STREAMING_METRICS.clone());
let streaming_metrics = Arc::new(global_streaming_metrics(
config.streaming.streaming_metric_level,
));
let batch_task_metrics = Arc::new(GLOBAL_BATCH_TASK_METRICS.clone());
let batch_executor_metrics = Arc::new(GLOBAL_BATCH_EXECUTOR_METRICS.clone());
let batch_manager_metrics = GLOBAL_BATCH_MANAGER_METRICS.clone();
Expand Down
1 change: 1 addition & 0 deletions src/config/example.toml
Original file line number Diff line number Diff line change
Expand Up @@ -67,6 +67,7 @@ batch_chunk_size = 1024
in_flight_barrier_nums = 10000
async_stack_trace = "ReleaseVerbose"
unique_user_stream_errors = 10
streaming_metric_level = "Info"

[streaming.developer]
stream_enable_executor_row_count = false
Expand Down
Loading