Skip to content

Commit fdaf30e

Browse files
authored
chore: add over window executor cache metrics (#12092)
Signed-off-by: Richard Chien <[email protected]>
1 parent ec0027e commit fdaf30e

File tree

7 files changed

+93
-2
lines changed

7 files changed

+93
-2
lines changed

docker/dashboards/risingwave-dev-dashboard.json

Lines changed: 1 addition & 1 deletion
Large diffs are not rendered by default.

grafana/risingwave-dev-dashboard.dashboard.py

Lines changed: 23 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -1059,6 +1059,10 @@ def section_streaming_actors(outer_panels):
10591059
"materialize executor cache miss ratio - table {{table_id}} actor {{actor_id}} {{instance}}",
10601060
),
10611061

1062+
panels.target(
1063+
f"(sum(rate({metric('stream_over_window_cache_miss_count')}[$__rate_interval])) by (table_id, actor_id) ) / (sum(rate({metric('stream_over_window_cache_lookup_count')}[$__rate_interval])) by (table_id, actor_id))",
1064+
"Over window cache miss ratio - table {{table_id}} actor {{actor_id}} ",
1065+
),
10621066
],
10631067
),
10641068
panels.timeseries_actor_latency(
@@ -1231,6 +1235,25 @@ def section_streaming_actors(outer_panels):
12311235

12321236
],
12331237
),
1238+
1239+
panels.timeseries_actor_ops(
1240+
"Over Window Executor Cache",
1241+
"",
1242+
[
1243+
panels.target(
1244+
f"rate({table_metric('stream_over_window_cached_entry_count')}[$__rate_interval])",
1245+
"cached entry count - table {{table_id}} - actor {{actor_id}} {{instance}}",
1246+
),
1247+
panels.target(
1248+
f"rate({table_metric('stream_over_window_cache_lookup_count')}[$__rate_interval])",
1249+
"cache lookup count - table {{table_id}} - actor {{actor_id}} {{instance}}",
1250+
),
1251+
panels.target(
1252+
f"rate({table_metric('stream_over_window_cache_miss_count')}[$__rate_interval])",
1253+
"cache miss count - table {{table_id}} - actor {{actor_id}} {{instance}}",
1254+
),
1255+
]
1256+
),
12341257
],
12351258
)
12361259
]

grafana/risingwave-dev-dashboard.json

Lines changed: 1 addition & 1 deletion
Large diffs are not rendered by default.

src/stream/src/executor/monitor/streaming_stats.rs

Lines changed: 32 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -99,6 +99,11 @@ pub struct StreamingMetrics {
9999
pub arrangement_backfill_snapshot_read_row_count: GenericCounterVec<AtomicU64>,
100100
pub arrangement_backfill_upstream_output_row_count: GenericCounterVec<AtomicU64>,
101101

102+
// Over Window
103+
pub over_window_cached_entry_count: GenericGaugeVec<AtomicI64>,
104+
pub over_window_cache_lookup_count: GenericCounterVec<AtomicU64>,
105+
pub over_window_cache_miss_count: GenericCounterVec<AtomicU64>,
106+
102107
/// The duration from receipt of barrier to all actors collection.
103108
/// And the max of all node `barrier_inflight_latency` is the latency for a barrier
104109
/// to flow through the graph.
@@ -595,6 +600,30 @@ impl StreamingMetrics {
595600
)
596601
.unwrap();
597602

603+
let over_window_cached_entry_count = register_int_gauge_vec_with_registry!(
604+
"stream_over_window_cached_entry_count",
605+
"Total entry (partition) count in over window executor cache",
606+
&["table_id", "actor_id"],
607+
registry
608+
)
609+
.unwrap();
610+
611+
let over_window_cache_lookup_count = register_int_counter_vec_with_registry!(
612+
"stream_over_window_cache_lookup_count",
613+
"Over window executor cache lookup count",
614+
&["table_id", "actor_id"],
615+
registry
616+
)
617+
.unwrap();
618+
619+
let over_window_cache_miss_count = register_int_counter_vec_with_registry!(
620+
"stream_over_window_cache_miss_count",
621+
"Over window executor cache miss count",
622+
&["table_id", "actor_id"],
623+
registry
624+
)
625+
.unwrap();
626+
598627
let opts = histogram_opts!(
599628
"stream_barrier_inflight_duration_seconds",
600629
"barrier_inflight_latency",
@@ -777,6 +806,9 @@ impl StreamingMetrics {
777806
backfill_upstream_output_row_count,
778807
arrangement_backfill_snapshot_read_row_count,
779808
arrangement_backfill_upstream_output_row_count,
809+
over_window_cached_entry_count,
810+
over_window_cache_lookup_count,
811+
over_window_cache_miss_count,
780812
barrier_inflight_latency,
781813
barrier_sync_latency,
782814
barrier_manager_progress,

src/stream/src/executor/over_window/general.rs

Lines changed: 33 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -15,6 +15,7 @@
1515
use std::collections::{btree_map, BTreeMap, HashSet};
1616
use std::marker::PhantomData;
1717
use std::ops::RangeInclusive;
18+
use std::sync::Arc;
1819

1920
use futures::StreamExt;
2021
use futures_async_stream::try_stream;
@@ -41,6 +42,7 @@ use super::over_partition::{
4142
use crate::cache::{new_unbounded, ManagedLruCache};
4243
use crate::common::metrics::MetricsInfo;
4344
use crate::common::StreamChunkBuilder;
45+
use crate::executor::monitor::StreamingMetrics;
4446
use crate::executor::over_window::delta_btree_map::PositionType;
4547
use crate::executor::test_utils::prelude::StateTable;
4648
use crate::executor::{
@@ -73,6 +75,7 @@ struct ExecutorInner<S: StateStore> {
7375

7476
state_table: StateTable<S>,
7577
watermark_epoch: AtomicU64Ref,
78+
metrics: Arc<StreamingMetrics>,
7679

7780
/// The maximum size of the chunk produced by executor at a time.
7881
chunk_size: usize,
@@ -84,9 +87,16 @@ struct ExecutionVars<S: StateStore> {
8487
cached_partitions: ManagedLruCache<OwnedRow, PartitionCache>,
8588
/// partition key => recently accessed range.
8689
recently_accessed_ranges: BTreeMap<DefaultOrdered<OwnedRow>, RangeInclusive<StateKey>>,
90+
stats: ExecutionStats,
8791
_phantom: PhantomData<S>,
8892
}
8993

94+
#[derive(Default)]
95+
struct ExecutionStats {
96+
cache_miss: u64,
97+
cache_lookup: u64,
98+
}
99+
90100
impl<S: StateStore> Executor for OverWindowExecutor<S> {
91101
fn execute(self: Box<Self>) -> crate::executor::BoxedMessageStream {
92102
self.executor_inner().boxed()
@@ -146,6 +156,7 @@ pub struct OverWindowExecutorArgs<S: StateStore> {
146156

147157
pub state_table: StateTable<S>,
148158
pub watermark_epoch: AtomicU64Ref,
159+
pub metrics: Arc<StreamingMetrics>,
149160

150161
pub chunk_size: usize,
151162
pub cache_policy: CachePolicy,
@@ -196,6 +207,7 @@ impl<S: StateStore> OverWindowExecutor<S> {
196207
input_schema_len: input_info.schema.len(),
197208
state_table: args.state_table,
198209
watermark_epoch: args.watermark_epoch,
210+
metrics: args.metrics,
199211
chunk_size: args.chunk_size,
200212
cache_policy,
201213
},
@@ -348,7 +360,9 @@ impl<S: StateStore> OverWindowExecutor<S> {
348360

349361
// Build final changes partition by partition.
350362
for (part_key, delta) in deltas {
363+
vars.stats.cache_lookup += 1;
351364
if !vars.cached_partitions.contains(&part_key.0) {
365+
vars.stats.cache_miss += 1;
352366
vars.cached_partitions
353367
.put(part_key.0.clone(), new_empty_partition_cache());
354368
}
@@ -593,6 +607,7 @@ impl<S: StateStore> OverWindowExecutor<S> {
593607
let mut vars = ExecutionVars {
594608
cached_partitions: new_unbounded(this.watermark_epoch.clone(), metrics_info),
595609
recently_accessed_ranges: Default::default(),
610+
stats: Default::default(),
596611
_phantom: PhantomData::<S>,
597612
};
598613

@@ -622,6 +637,24 @@ impl<S: StateStore> OverWindowExecutor<S> {
622637
this.state_table.commit(barrier.epoch).await?;
623638
vars.cached_partitions.evict();
624639

640+
{
641+
// update metrics
642+
let actor_id_str = this.actor_ctx.id.to_string();
643+
let table_id_str = this.state_table.table_id().to_string();
644+
this.metrics
645+
.over_window_cached_entry_count
646+
.with_label_values(&[&table_id_str, &actor_id_str])
647+
.set(vars.cached_partitions.len() as _);
648+
this.metrics
649+
.over_window_cache_lookup_count
650+
.with_label_values(&[&table_id_str, &actor_id_str])
651+
.inc_by(std::mem::take(&mut vars.stats.cache_lookup));
652+
this.metrics
653+
.over_window_cache_miss_count
654+
.with_label_values(&[&table_id_str, &actor_id_str])
655+
.inc_by(std::mem::take(&mut vars.stats.cache_miss));
656+
}
657+
625658
if let Some(vnode_bitmap) = barrier.as_update_vnode_bitmap(this.actor_ctx.id) {
626659
let (_, cache_may_stale) =
627660
this.state_table.update_vnode_bitmap(vnode_bitmap);

src/stream/src/from_proto/over_window.rs

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -73,6 +73,7 @@ impl ExecutorBuilder for OverWindowExecutorBuilder {
7373
order_key_order_types,
7474
state_table,
7575
watermark_epoch: stream.get_watermark_epoch(),
76+
metrics: params.executor_stats,
7677
chunk_size: params.env.config().developer.chunk_size,
7778
cache_policy: OverWindowCachePolicy::from_protobuf(node.get_cache_policy()?),
7879
})

src/stream/tests/integration_tests/over_window.rs

Lines changed: 2 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -17,6 +17,7 @@ use risingwave_expr::agg::{AggArgs, AggKind};
1717
use risingwave_expr::function::window::{
1818
Frame, FrameBound, FrameExclusion, WindowFuncCall, WindowFuncKind,
1919
};
20+
use risingwave_stream::executor::monitor::StreamingMetrics;
2021
use risingwave_stream::executor::{OverWindowExecutor, OverWindowExecutorArgs};
2122

2223
use crate::prelude::*;
@@ -78,6 +79,7 @@ async fn create_executor<S: StateStore>(
7879
order_key_order_types,
7980
state_table,
8081
watermark_epoch: Arc::new(AtomicU64::new(0)),
82+
metrics: Arc::new(StreamingMetrics::unused()),
8183
chunk_size: 1024,
8284
cache_policy: OverWindowCachePolicy::Recent,
8385
});

0 commit comments

Comments
 (0)