Skip to content

Commit 30e2f46

Browse files
Eric FuLi0k
Eric Fu
authored andcommitted
refactor(metrics): support streaming metrics level (#11994)
1 parent 1d4e98e commit 30e2f46

17 files changed

+242
-146
lines changed

docker/dashboards/risingwave-dev-dashboard.json

Lines changed: 1 addition & 1 deletion
Large diffs are not rendered by default.

grafana/risingwave-dev-dashboard.dashboard.py

Lines changed: 8 additions & 8 deletions
Original file line numberDiff line numberDiff line change
@@ -1071,14 +1071,14 @@ def section_streaming_actors(outer_panels):
10711071
[
10721072
*quantile(
10731073
lambda quantile, legend: panels.target(
1074-
f"histogram_quantile({quantile}, sum(rate({metric('stream_join_barrier_align_duration_bucket')}[$__rate_interval])) by (le, actor_id, wait_side, job, instance))",
1075-
f"p{legend} {{{{actor_id}}}}.{{{{wait_side}}}} - {{{{job}}}} @ {{{{instance}}}}",
1074+
f"histogram_quantile({quantile}, sum(rate({metric('stream_join_barrier_align_duration_bucket')}[$__rate_interval])) by (le, fragment_id, wait_side, job, instance))",
1075+
f"p{legend} - fragment {{{{fragment_id}}}} {{{{wait_side}}}} - {{{{job}}}} @ {{{{instance}}}}",
10761076
),
10771077
[90, 99, 999, "max"],
10781078
),
10791079
panels.target(
1080-
f"sum by(le, actor_id, wait_side, job, instance)(rate({metric('stream_join_barrier_align_duration_sum')}[$__rate_interval])) / sum by(le,actor_id,wait_side,job,instance) (rate({metric('stream_join_barrier_align_duration_count')}[$__rate_interval]))",
1081-
"avg {{actor_id}}.{{wait_side}} - {{job}} @ {{instance}}",
1080+
f"sum by(le, fragment_id, wait_side, job, instance)(rate({metric('stream_join_barrier_align_duration_sum')}[$__rate_interval])) / sum by(le,fragment_id,wait_side,job,instance) (rate({metric('stream_join_barrier_align_duration_count')}[$__rate_interval]))",
1081+
"avg - fragment {{fragment_id}} {{wait_side}} - {{job}} @ {{instance}}",
10821082
),
10831083
],
10841084
),
@@ -1135,14 +1135,14 @@ def section_streaming_actors(outer_panels):
11351135
[
11361136
*quantile(
11371137
lambda quantile, legend: panels.target(
1138-
f"histogram_quantile({quantile}, sum(rate({metric('stream_join_matched_join_keys_bucket')}[$__rate_interval])) by (le, actor_id, table_id, job, instance))",
1139-
f"p{legend} - actor_id {{{{actor_id}}}} table_id {{{{table_id}}}} - {{{{job}}}} @ {{{{instance}}}}",
1138+
f"histogram_quantile({quantile}, sum(rate({metric('stream_join_matched_join_keys_bucket')}[$__rate_interval])) by (le, fragment_id, table_id, job, instance))",
1139+
f"p{legend} - fragment {{{{fragment_id}}}} table_id {{{{table_id}}}} - {{{{job}}}} @ {{{{instance}}}}",
11401140
),
11411141
[90, 99, "max"],
11421142
),
11431143
panels.target(
1144-
f"sum by(le, job, instance, actor_id, table_id) (rate({metric('stream_join_matched_join_keys_sum')}[$__rate_interval])) / sum by(le, job, instance, actor_id, table_id) (rate({table_metric('stream_join_matched_join_keys_count')}[$__rate_interval]))",
1145-
"avg - actor_id {{actor_id}} table_id {{table_id}} - {{job}} @ {{instance}}",
1144+
f"sum by(le, job, instance, actor_id, table_id) (rate({metric('stream_join_matched_join_keys_sum')}[$__rate_interval])) / sum by(le, job, instance, fragment_id, table_id) (rate({table_metric('stream_join_matched_join_keys_count')}[$__rate_interval]))",
1145+
"avg - fragment {{fragment_id}} table_id {{table_id}} - {{job}} @ {{instance}}",
11461146
),
11471147
],
11481148
),

grafana/risingwave-dev-dashboard.json

Lines changed: 1 addition & 1 deletion
Large diffs are not rendered by default.

src/common/src/config.rs

Lines changed: 15 additions & 8 deletions
Original file line numberDiff line numberDiff line change
@@ -420,26 +420,29 @@ pub struct StreamingConfig {
420420
#[serde(default = "default::streaming::unique_user_stream_errors")]
421421
pub unique_user_stream_errors: usize,
422422

423+
#[serde(default = "default::streaming::streaming_metric_level")]
424+
pub streaming_metric_level: MetricLevel,
425+
423426
#[serde(default, flatten)]
424427
pub unrecognized: Unrecognized<Self>,
425428
}
426429

427430
#[derive(Debug, Default, Clone, Copy, ValueEnum, Serialize, Deserialize)]
428-
pub enum StorageMetricLevel {
431+
pub enum MetricLevel {
429432
#[default]
430433
Disabled = 0,
431434
Critical = 1,
432435
Info = 2,
433436
Debug = 3,
434437
}
435438

436-
impl PartialEq<Self> for StorageMetricLevel {
439+
impl PartialEq<Self> for MetricLevel {
437440
fn eq(&self, other: &Self) -> bool {
438441
(*self as u8).eq(&(*other as u8))
439442
}
440443
}
441444

442-
impl PartialOrd for StorageMetricLevel {
445+
impl PartialOrd for MetricLevel {
443446
fn partial_cmp(&self, other: &Self) -> Option<Ordering> {
444447
(*self as u8).partial_cmp(&(*other as u8))
445448
}
@@ -567,7 +570,7 @@ pub struct StorageConfig {
567570
pub compactor_max_sst_size: u64,
568571

569572
#[serde(default = "default::storage::storage_metric_level")]
570-
pub storage_metric_level: StorageMetricLevel,
573+
pub storage_metric_level: MetricLevel,
571574

572575
#[serde(default, flatten)]
573576
pub unrecognized: Unrecognized<Self>,
@@ -938,7 +941,7 @@ pub mod default {
938941
}
939942

940943
pub mod storage {
941-
use crate::config::StorageMetricLevel;
944+
use crate::config::MetricLevel;
942945

943946
pub fn share_buffers_sync_parallelism() -> u32 {
944947
1
@@ -1045,13 +1048,13 @@ pub mod default {
10451048
512 * 1024 * 1024 // 512m
10461049
}
10471050

1048-
pub fn storage_metric_level() -> StorageMetricLevel {
1049-
StorageMetricLevel::Info
1051+
pub fn storage_metric_level() -> MetricLevel {
1052+
MetricLevel::Info
10501053
}
10511054
}
10521055

10531056
pub mod streaming {
1054-
use crate::config::AsyncStackTraceOption;
1057+
use crate::config::{AsyncStackTraceOption, MetricLevel};
10551058

10561059
pub fn in_flight_barrier_nums() -> usize {
10571060
// quick fix
@@ -1066,6 +1069,10 @@ pub mod default {
10661069
pub fn unique_user_stream_errors() -> usize {
10671070
10
10681071
}
1072+
1073+
pub fn streaming_metric_level() -> MetricLevel {
1074+
MetricLevel::Info
1075+
}
10691076
}
10701077

10711078
pub mod file_cache {

src/common/src/metrics.rs

Lines changed: 3 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -25,6 +25,9 @@ use tracing_subscriber::Layer;
2525

2626
use crate::monitor::GLOBAL_METRICS_REGISTRY;
2727

28+
mod relabeled_metric;
29+
pub use relabeled_metric::*;
30+
2831
#[derive(Debug)]
2932
pub struct TrAdderAtomic(TrAdder<i64>);
3033

src/storage/src/monitor/relabeled_metric.rs renamed to src/common/src/metrics/relabeled_metric.rs

Lines changed: 60 additions & 12 deletions
Original file line numberDiff line numberDiff line change
@@ -14,7 +14,8 @@
1414

1515
use prometheus::core::{AtomicU64, GenericCounter, GenericCounterVec};
1616
use prometheus::{Histogram, HistogramVec};
17-
use risingwave_common::config::StorageMetricLevel;
17+
18+
use crate::config::MetricLevel;
1819

1920
/// For all `Relabeled*Vec` below,
2021
/// - when `metric_level` <= `relabel_threshold`, they behaves exactly the same as their inner
@@ -26,58 +27,105 @@ use risingwave_common::config::StorageMetricLevel;
2627
/// We could have use one single struct to represent all `MetricVec<T: MetricVecBuilder>`, rather
2728
/// than specializing them one by one. However, that's undoable because prometheus crate doesn't
2829
/// export `MetricVecBuilder` implementation like `HistogramVecBuilder`.
29-
3030
#[derive(Clone, Debug)]
3131
pub struct RelabeledHistogramVec {
32-
relabel_threshold: StorageMetricLevel,
33-
metric_level: StorageMetricLevel,
32+
relabel_threshold: MetricLevel,
33+
metric_level: MetricLevel,
3434
metric: HistogramVec,
35+
36+
/// The first `relabel_num` labels will be relabeled to empty string
37+
///
38+
/// For example, if `relabel_num` is 1, and the input labels are `["actor_id",
39+
/// "fragment_id", "table_id"]`, when threshold is reached, the label values will be
40+
/// `["", "<original_fragment_id>", "<original_table_id>"]`.
41+
relabel_num: usize,
3542
}
3643

3744
impl RelabeledHistogramVec {
3845
pub fn with_metric_level(
39-
metric_level: StorageMetricLevel,
46+
metric_level: MetricLevel,
4047
metric: HistogramVec,
41-
relabel_threshold: StorageMetricLevel,
48+
relabel_threshold: MetricLevel,
4249
) -> Self {
4350
Self {
4451
relabel_threshold,
4552
metric_level,
4653
metric,
54+
relabel_num: usize::MAX,
55+
}
56+
}
57+
58+
pub fn with_metric_level_relabel_n(
59+
metric_level: MetricLevel,
60+
metric: HistogramVec,
61+
relabel_threshold: MetricLevel,
62+
relabel_num: usize,
63+
) -> Self {
64+
Self {
65+
relabel_threshold,
66+
metric_level,
67+
metric,
68+
relabel_num,
4769
}
4870
}
4971

5072
pub fn with_label_values(&self, vals: &[&str]) -> Histogram {
5173
if self.metric_level > self.relabel_threshold {
52-
return self.metric.with_label_values(&vec![""; vals.len()]);
74+
// relabel first n labels to empty string
75+
let mut relabeled_vals = vals.to_vec();
76+
for label in relabeled_vals.iter_mut().take(self.relabel_num) {
77+
*label = "";
78+
}
79+
self.metric.with_label_values(&relabeled_vals);
5380
}
5481
self.metric.with_label_values(vals)
5582
}
5683
}
5784

5885
#[derive(Clone, Debug)]
5986
pub struct RelabeledCounterVec {
60-
relabel_threshold: StorageMetricLevel,
61-
metric_level: StorageMetricLevel,
87+
relabel_threshold: MetricLevel,
88+
metric_level: MetricLevel,
6289
metric: GenericCounterVec<AtomicU64>,
90+
relabel_num: usize,
6391
}
6492

6593
impl RelabeledCounterVec {
6694
pub fn with_metric_level(
67-
metric_level: StorageMetricLevel,
95+
metric_level: MetricLevel,
96+
metric: GenericCounterVec<AtomicU64>,
97+
relabel_threshold: MetricLevel,
98+
) -> Self {
99+
Self {
100+
relabel_threshold,
101+
metric_level,
102+
metric,
103+
relabel_num: usize::MAX,
104+
}
105+
}
106+
107+
pub fn with_metric_level_relabel_n(
108+
metric_level: MetricLevel,
68109
metric: GenericCounterVec<AtomicU64>,
69-
relabel_threshold: StorageMetricLevel,
110+
relabel_threshold: MetricLevel,
111+
relabel_num: usize,
70112
) -> Self {
71113
Self {
72114
relabel_threshold,
73115
metric_level,
74116
metric,
117+
relabel_num,
75118
}
76119
}
77120

78121
pub fn with_label_values(&self, vals: &[&str]) -> GenericCounter<AtomicU64> {
79122
if self.metric_level > self.relabel_threshold {
80-
return self.metric.with_label_values(&vec![""; vals.len()]);
123+
// relabel first n labels to empty string
124+
let mut relabeled_vals = vals.to_vec();
125+
for label in relabeled_vals.iter_mut().take(self.relabel_num) {
126+
*label = "";
127+
}
128+
self.metric.with_label_values(&relabeled_vals);
81129
}
82130
self.metric.with_label_values(vals)
83131
}

src/compute/src/server.rs

Lines changed: 4 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -59,7 +59,7 @@ use risingwave_storage::monitor::{
5959
};
6060
use risingwave_storage::opts::StorageOpts;
6161
use risingwave_storage::StateStoreImpl;
62-
use risingwave_stream::executor::monitor::GLOBAL_STREAMING_METRICS;
62+
use risingwave_stream::executor::monitor::global_streaming_metrics;
6363
use risingwave_stream::task::{LocalStreamManager, StreamEnvironment};
6464
use tokio::sync::oneshot::Sender;
6565
use tokio::task::JoinHandle;
@@ -169,7 +169,9 @@ pub async fn compute_node_serve(
169169
// Initialize the metrics subsystem.
170170
let source_metrics = Arc::new(GLOBAL_SOURCE_METRICS.clone());
171171
let hummock_metrics = Arc::new(GLOBAL_HUMMOCK_METRICS.clone());
172-
let streaming_metrics = Arc::new(GLOBAL_STREAMING_METRICS.clone());
172+
let streaming_metrics = Arc::new(global_streaming_metrics(
173+
config.streaming.streaming_metric_level,
174+
));
173175
let batch_task_metrics = Arc::new(GLOBAL_BATCH_TASK_METRICS.clone());
174176
let batch_executor_metrics = Arc::new(GLOBAL_BATCH_EXECUTOR_METRICS.clone());
175177
let batch_manager_metrics = GLOBAL_BATCH_MANAGER_METRICS.clone();

src/config/example.toml

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -67,6 +67,7 @@ batch_chunk_size = 1024
6767
in_flight_barrier_nums = 10000
6868
async_stack_trace = "ReleaseVerbose"
6969
unique_user_stream_errors = 10
70+
streaming_metric_level = "Info"
7071

7172
[streaming.developer]
7273
stream_enable_executor_row_count = false

0 commit comments

Comments
 (0)