Skip to content

Commit e0b9df4

Browse files
committed
use metrics
1 parent ede3278 commit e0b9df4

File tree

11 files changed

+74
-46
lines changed

11 files changed

+74
-46
lines changed

src/stream/src/cache/managed_lru.rs

Lines changed: 0 additions & 6 deletions
Original file line numberDiff line numberDiff line change
@@ -232,12 +232,6 @@ impl<K: Hash + Eq + EstimateSize, V: EstimateSize, S: BuildHasher, A: Clone + Al
232232

233233
pub fn new_unbounded<K: Hash + Eq + EstimateSize, V: EstimateSize>(
234234
watermark_epoch: Arc<AtomicU64>,
235-
) -> ManagedLruCache<K, V> {
236-
ManagedLruCache::new_inner(LruCache::unbounded(), watermark_epoch, None)
237-
}
238-
239-
pub fn new_unbounded_with_metrics<K: Hash + Eq + EstimateSize, V: EstimateSize>(
240-
watermark_epoch: Arc<AtomicU64>,
241235
metrics_info: MetricsInfo,
242236
) -> ManagedLruCache<K, V> {
243237
ManagedLruCache::new_inner(LruCache::unbounded(), watermark_epoch, Some(metrics_info))

src/stream/src/executor/aggregation/distinct.rs

Lines changed: 2 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -26,7 +26,7 @@ use risingwave_common::util::iter_util::ZipEqFast;
2626
use risingwave_storage::StateStore;
2727

2828
use super::{AggCall, GroupKey};
29-
use crate::cache::{new_unbounded_with_metrics, ManagedLruCache};
29+
use crate::cache::{new_unbounded, ManagedLruCache};
3030
use crate::common::metrics::MetricsInfo;
3131
use crate::common::table::state_table::StateTable;
3232
use crate::executor::monitor::StreamingMetrics;
@@ -45,7 +45,7 @@ struct ColumnDeduplicater<S: StateStore> {
4545
impl<S: StateStore> ColumnDeduplicater<S> {
4646
fn new(watermark_epoch: &Arc<AtomicU64>, metrics_info: MetricsInfo) -> Self {
4747
Self {
48-
cache: new_unbounded_with_metrics(watermark_epoch.clone(), metrics_info.clone()),
48+
cache: new_unbounded(watermark_epoch.clone(), metrics_info.clone()),
4949
metrics_info,
5050
_phantom: PhantomData,
5151
}

src/stream/src/executor/dedup/append_only_dedup.rs

Lines changed: 9 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -12,6 +12,8 @@
1212
// See the License for the specific language governing permissions and
1313
// limitations under the License.
1414

15+
use std::sync::Arc;
16+
1517
use futures::{stream, StreamExt};
1618
use futures_async_stream::try_stream;
1719
use itertools::Itertools;
@@ -22,8 +24,10 @@ use risingwave_common::row::{OwnedRow, Row, RowExt};
2224
use risingwave_storage::StateStore;
2325

2426
use super::cache::DedupCache;
27+
use crate::common::metrics::MetricsInfo;
2528
use crate::common::table::state_table::StateTable;
2629
use crate::executor::error::StreamExecutorError;
30+
use crate::executor::monitor::StreamingMetrics;
2731
use crate::executor::{
2832
expect_first_barrier, ActorContextRef, BoxedExecutor, BoxedMessageStream, Executor, Message,
2933
PkIndices, PkIndicesRef, StreamExecutorResult,
@@ -51,12 +55,15 @@ impl<S: StateStore> AppendOnlyDedupExecutor<S> {
5155
executor_id: u64,
5256
ctx: ActorContextRef,
5357
watermark_epoch: AtomicU64Ref,
58+
metrics: Arc<StreamingMetrics>,
5459
) -> Self {
5560
let schema = input.schema().clone();
61+
let metrics_info =
62+
MetricsInfo::new(metrics, state_table.table_id(), ctx.id, "AppendOnly Dedup");
5663
Self {
5764
input: Some(input),
5865
state_table,
59-
cache: DedupCache::new(watermark_epoch),
66+
cache: DedupCache::new(watermark_epoch, metrics_info),
6067
pk_indices,
6168
identity: format!("AppendOnlyDedupExecutor {:X}", executor_id),
6269
schema,
@@ -259,6 +266,7 @@ mod tests {
259266
1,
260267
ActorContext::create(123),
261268
Arc::new(AtomicU64::new(0)),
269+
Arc::new(StreamingMetrics::unused()),
262270
))
263271
.execute();
264272

src/stream/src/executor/dedup/cache.rs

Lines changed: 5 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -17,6 +17,7 @@ use std::hash::Hash;
1717
use risingwave_common::estimate_size::EstimateSize;
1818

1919
use crate::cache::{new_unbounded, ManagedLruCache};
20+
use crate::common::metrics::MetricsInfo;
2021
use crate::task::AtomicU64Ref;
2122

2223
/// [`DedupCache`] is used for key deduplication. Currently, the cache behaves like a set that only
@@ -26,8 +27,8 @@ pub struct DedupCache<K: Hash + Eq + EstimateSize> {
2627
}
2728

2829
impl<K: Hash + Eq + EstimateSize> DedupCache<K> {
29-
pub fn new(watermark_epoch: AtomicU64Ref) -> Self {
30-
let cache = new_unbounded(watermark_epoch);
30+
pub fn new(watermark_epoch: AtomicU64Ref, metrics_info: MetricsInfo) -> Self {
31+
let cache = new_unbounded(watermark_epoch, metrics_info);
3132
Self { inner: cache }
3233
}
3334

@@ -69,10 +70,11 @@ mod tests {
6970
use std::sync::Arc;
7071

7172
use super::DedupCache;
73+
use crate::common::metrics::MetricsInfo;
7274

7375
#[test]
7476
fn test_dedup_cache() {
75-
let mut cache = DedupCache::new(Arc::new(AtomicU64::new(10000)));
77+
let mut cache = DedupCache::new(Arc::new(AtomicU64::new(10000)), MetricsInfo::for_test());
7678

7779
cache.insert(10);
7880
assert!(cache.contains(&10));

src/stream/src/executor/lookup/cache.rs

Lines changed: 3 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -19,6 +19,7 @@ use risingwave_common::estimate_size::{EstimateSize, KvSize, VecWithKvSize};
1919
use risingwave_common::row::{OwnedRow, Row, RowExt};
2020

2121
use crate::cache::{new_unbounded, ManagedLruCache};
22+
use crate::common::metrics::MetricsInfo;
2223
use crate::task::AtomicU64Ref;
2324

2425
/// A cache for lookup's arrangement side.
@@ -73,8 +74,8 @@ impl LookupCache {
7374
self.data.clear();
7475
}
7576

76-
pub fn new(watermark_epoch: AtomicU64Ref) -> Self {
77-
let cache = new_unbounded(watermark_epoch);
77+
pub fn new(watermark_epoch: AtomicU64Ref, metrics_info: MetricsInfo) -> Self {
78+
let cache = new_unbounded(watermark_epoch, metrics_info);
7879
Self { data: cache }
7980
}
8081
}

src/stream/src/executor/lookup/impl_.rs

Lines changed: 9 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -30,6 +30,7 @@ use risingwave_storage::StateStore;
3030

3131
use super::sides::{stream_lookup_arrange_prev_epoch, stream_lookup_arrange_this_epoch};
3232
use crate::cache::cache_may_stale;
33+
use crate::common::metrics::MetricsInfo;
3334
use crate::common::StreamChunkBuilder;
3435
use crate::executor::error::{StreamExecutorError, StreamExecutorResult};
3536
use crate::executor::lookup::cache::LookupCache;
@@ -206,6 +207,13 @@ impl<S: StateStore> LookupExecutor<S> {
206207
"mismatched output schema"
207208
);
208209

210+
let metrics_info = MetricsInfo::new(
211+
ctx.streaming_metrics.clone(),
212+
storage_table.table_id().table_id(),
213+
ctx.id,
214+
"Lookup",
215+
);
216+
209217
Self {
210218
ctx,
211219
chunk_data_types,
@@ -230,7 +238,7 @@ impl<S: StateStore> LookupExecutor<S> {
230238
},
231239
column_mapping,
232240
key_indices_mapping,
233-
lookup_cache: LookupCache::new(watermark_epoch),
241+
lookup_cache: LookupCache::new(watermark_epoch, metrics_info),
234242
chunk_size,
235243
}
236244
}

src/stream/src/executor/mview/materialize.rs

Lines changed: 16 additions & 26 deletions
Original file line numberDiff line numberDiff line change
@@ -38,6 +38,7 @@ use risingwave_storage::mem_table::KeyOp;
3838
use risingwave_storage::StateStore;
3939

4040
use crate::cache::{new_unbounded, ManagedLruCache};
41+
use crate::common::metrics::MetricsInfo;
4142
use crate::common::table::state_table::StateTableInner;
4243
use crate::executor::error::StreamExecutorError;
4344
use crate::executor::monitor::StreamingMetrics;
@@ -97,8 +98,9 @@ impl<S: StateStore, SD: ValueRowSerde> MaterializeExecutor<S, SD> {
9798
StateTableInner::from_table_catalog(table_catalog, store, vnodes).await
9899
};
99100

100-
let actor_id = actor_context.id;
101-
let table_id = table_catalog.id;
101+
let metrics_info =
102+
MetricsInfo::new(metrics, table_catalog.id, actor_context.id, "Materialize");
103+
102104
Self {
103105
input,
104106
state_table,
@@ -109,7 +111,7 @@ impl<S: StateStore, SD: ValueRowSerde> MaterializeExecutor<S, SD> {
109111
pk_indices: arrange_columns,
110112
identity: format!("MaterializeExecutor {:X}", executor_id),
111113
},
112-
materialize_cache: MaterializeCache::new(watermark_epoch, metrics, actor_id, table_id),
114+
materialize_cache: MaterializeCache::new(watermark_epoch, metrics_info),
113115
conflict_behavior,
114116
}
115117
}
@@ -235,12 +237,7 @@ impl<S: StateStore> MaterializeExecutor<S, BasicSerde> {
235237
pk_indices: arrange_columns,
236238
identity: format!("MaterializeExecutor {:X}", executor_id),
237239
},
238-
materialize_cache: MaterializeCache::new(
239-
watermark_epoch,
240-
Arc::new(StreamingMetrics::unused()),
241-
0,
242-
0,
243-
),
240+
materialize_cache: MaterializeCache::new(watermark_epoch, MetricsInfo::for_test()),
244241
conflict_behavior,
245242
}
246243
}
@@ -436,10 +433,8 @@ impl<S: StateStore, SD: ValueRowSerde> std::fmt::Debug for MaterializeExecutor<S
436433
/// A cache for materialize executors.
437434
pub struct MaterializeCache<SD> {
438435
data: ManagedLruCache<Vec<u8>, CacheValue>,
436+
metrics_info: MetricsInfo,
439437
_serde: PhantomData<SD>,
440-
metrics: Arc<StreamingMetrics>,
441-
actor_id: String,
442-
table_id: String,
443438
}
444439

445440
#[derive(EnumAsInner, EstimateSize)]
@@ -451,19 +446,12 @@ pub enum CacheValue {
451446
type EmptyValue = ();
452447

453448
impl<SD: ValueRowSerde> MaterializeCache<SD> {
454-
pub fn new(
455-
watermark_epoch: AtomicU64Ref,
456-
metrics: Arc<StreamingMetrics>,
457-
actor_id: u32,
458-
table_id: u32,
459-
) -> Self {
460-
let cache = new_unbounded(watermark_epoch);
449+
pub fn new(watermark_epoch: AtomicU64Ref, metrics_info: MetricsInfo) -> Self {
450+
let cache = new_unbounded(watermark_epoch, metrics_info.clone());
461451
Self {
462452
data: cache,
453+
metrics_info,
463454
_serde: PhantomData,
464-
metrics,
465-
actor_id: actor_id.to_string(),
466-
table_id: table_id.to_string(),
467455
}
468456
}
469457

@@ -604,14 +592,16 @@ impl<SD: ValueRowSerde> MaterializeCache<SD> {
604592
) -> StreamExecutorResult<()> {
605593
let mut futures = vec![];
606594
for key in keys {
607-
self.metrics
595+
self.metrics_info
596+
.metrics
608597
.materialize_cache_total_count
609-
.with_label_values(&[&self.table_id, &self.actor_id])
598+
.with_label_values(&[&self.metrics_info.table_id, &self.metrics_info.actor_id])
610599
.inc();
611600
if self.data.contains(key) {
612-
self.metrics
601+
self.metrics_info
602+
.metrics
613603
.materialize_cache_hit_count
614-
.with_label_values(&[&self.table_id, &self.actor_id])
604+
.with_label_values(&[&self.metrics_info.table_id, &self.metrics_info.actor_id])
615605
.inc();
616606
continue;
617607
}

src/stream/src/executor/over_window/eowc.rs

Lines changed: 9 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -33,6 +33,7 @@ use risingwave_storage::StateStore;
3333

3434
use super::state::{create_window_state, EstimatedVecDeque, WindowState};
3535
use crate::cache::{new_unbounded, ManagedLruCache};
36+
use crate::common::metrics::MetricsInfo;
3637
use crate::common::table::state_table::StateTable;
3738
use crate::executor::over_window::state::{StateEvictHint, StateKey};
3839
use crate::executor::{
@@ -413,8 +414,15 @@ impl<S: StateStore> EowcOverWindowExecutor<S> {
413414
inner: mut this,
414415
} = self;
415416

417+
let metrics_info = MetricsInfo::new(
418+
this.actor_ctx.streaming_metrics.clone(),
419+
this.state_table.table_id(),
420+
this.actor_ctx.id,
421+
"EowcOverWindow",
422+
);
423+
416424
let mut vars = ExecutionVars {
417-
partitions: new_unbounded(this.watermark_epoch.clone()),
425+
partitions: new_unbounded(this.watermark_epoch.clone(), metrics_info),
418426
_phantom: PhantomData::<S>,
419427
};
420428

src/stream/src/executor/top_n/group_top_n.rs

Lines changed: 11 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -29,6 +29,7 @@ use super::top_n_cache::TopNCacheTrait;
2929
use super::utils::*;
3030
use super::{ManagedTopNState, TopNCache};
3131
use crate::cache::{new_unbounded, ManagedLruCache};
32+
use crate::common::metrics::MetricsInfo;
3233
use crate::common::table::state_table::StateTable;
3334
use crate::error::StreamResult;
3435
use crate::executor::error::StreamExecutorResult;
@@ -113,6 +114,13 @@ impl<K: HashKey, S: StateStore, const WITH_TIES: bool> InnerGroupTopNExecutor<K,
113114
pk_indices, schema, ..
114115
} = input_info;
115116

117+
let metrics_info = MetricsInfo::new(
118+
ctx.streaming_metrics.clone(),
119+
state_table.table_id(),
120+
ctx.id,
121+
"GroupTopN",
122+
);
123+
116124
let cache_key_serde = create_cache_key_serde(&storage_key, &schema, &order_by, &group_by);
117125
let managed_state = ManagedTopNState::<S>::new(state_table, cache_key_serde.clone());
118126

@@ -127,7 +135,7 @@ impl<K: HashKey, S: StateStore, const WITH_TIES: bool> InnerGroupTopNExecutor<K,
127135
managed_state,
128136
storage_key_indices: storage_key.into_iter().map(|op| op.column_index).collect(),
129137
group_by,
130-
caches: GroupTopNCache::new(watermark_epoch),
138+
caches: GroupTopNCache::new(watermark_epoch, metrics_info),
131139
cache_key_serde,
132140
ctx,
133141
})
@@ -139,8 +147,8 @@ pub struct GroupTopNCache<K: HashKey, const WITH_TIES: bool> {
139147
}
140148

141149
impl<K: HashKey, const WITH_TIES: bool> GroupTopNCache<K, WITH_TIES> {
142-
pub fn new(watermark_epoch: AtomicU64Ref) -> Self {
143-
let cache = new_unbounded(watermark_epoch);
150+
pub fn new(watermark_epoch: AtomicU64Ref, metrics_info: MetricsInfo) -> Self {
151+
let cache = new_unbounded(watermark_epoch, metrics_info);
144152
Self { data: cache }
145153
}
146154
}

src/stream/src/executor/top_n/group_top_n_appendonly.rs

Lines changed: 9 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -42,6 +42,7 @@ use super::group_top_n::GroupTopNCache;
4242
use super::top_n_cache::AppendOnlyTopNCacheTrait;
4343
use super::utils::*;
4444
use super::{ManagedTopNState, TopNCache};
45+
use crate::common::metrics::MetricsInfo;
4546
use crate::common::table::state_table::StateTable;
4647
use crate::error::StreamResult;
4748
use crate::executor::error::StreamExecutorResult;
@@ -133,6 +134,13 @@ impl<K: HashKey, S: StateStore, const WITH_TIES: bool>
133134
pk_indices, schema, ..
134135
} = input_info;
135136

137+
let metrics_info = MetricsInfo::new(
138+
ctx.streaming_metrics.clone(),
139+
state_table.table_id(),
140+
ctx.id,
141+
"GroupTopN",
142+
);
143+
136144
let cache_key_serde = create_cache_key_serde(&storage_key, &schema, &order_by, &group_by);
137145
let managed_state = ManagedTopNState::<S>::new(state_table, cache_key_serde.clone());
138146

@@ -147,7 +155,7 @@ impl<K: HashKey, S: StateStore, const WITH_TIES: bool>
147155
managed_state,
148156
storage_key_indices: storage_key.into_iter().map(|op| op.column_index).collect(),
149157
group_by,
150-
caches: GroupTopNCache::new(watermark_epoch),
158+
caches: GroupTopNCache::new(watermark_epoch, metrics_info),
151159
cache_key_serde,
152160
ctx,
153161
})

src/stream/src/from_proto/append_only_dedup.rs

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -52,6 +52,7 @@ impl ExecutorBuilder for AppendOnlyDedupExecutorBuilder {
5252
params.executor_id,
5353
params.actor_context,
5454
stream.get_watermark_epoch(),
55+
stream.streaming_metrics.clone(),
5556
)))
5657
}
5758
}

0 commit comments

Comments
 (0)