Skip to content

feat(metrics): add memory usage metrics for more executor #10351

New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Merged
merged 3 commits into from
Jun 15, 2023
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
65 changes: 27 additions & 38 deletions src/stream/src/cache/managed_lru.rs
Original file line number Diff line number Diff line change
Expand Up @@ -39,24 +39,22 @@ pub struct ManagedLruCache<K, V, S = DefaultHasher, A: Clone + Allocator = Globa
/// The heap size of keys/values
kv_heap_size: usize,
/// The metrics of memory usage
memory_usage_metrics: Option<IntGauge>,
memory_usage_metrics: IntGauge,
// Metrics info
metrics_info: Option<MetricsInfo>,
metrics_info: MetricsInfo,
/// The size reported last time
last_reported_size_bytes: usize,
}

impl<K, V, S, A: Clone + Allocator> Drop for ManagedLruCache<K, V, S, A> {
fn drop(&mut self) {
if let Some(metrics) = &self.memory_usage_metrics {
metrics.set(0.into());
}
if let Some(info) = &self.metrics_info {
info.metrics
.stream_memory_usage
.remove_label_values(&[&info.table_id, &info.actor_id, &info.desc])
.unwrap();
}
let info = &self.metrics_info;
self.memory_usage_metrics.set(0.into());

info.metrics
.stream_memory_usage
.remove_label_values(&[&info.table_id, &info.actor_id, &info.desc])
.unwrap();
}
}

Expand All @@ -66,15 +64,16 @@ impl<K: Hash + Eq + EstimateSize, V: EstimateSize, S: BuildHasher, A: Clone + Al
pub fn new_inner(
inner: LruCache<K, V, S, A>,
watermark_epoch: Arc<AtomicU64>,
metrics_info: Option<MetricsInfo>,
metrics_info: MetricsInfo,
) -> Self {
let memory_usage_metrics = metrics_info.as_ref().map(|info| {
info.metrics.stream_memory_usage.with_label_values(&[
&info.table_id,
&info.actor_id,
&info.desc,
])
});
let memory_usage_metrics = metrics_info
.metrics
.stream_memory_usage
.with_label_values(&[
&metrics_info.table_id,
&metrics_info.actor_id,
&metrics_info.desc,
]);

Self {
inner,
Expand Down Expand Up @@ -219,9 +218,7 @@ impl<K: Hash + Eq + EstimateSize, V: EstimateSize, S: BuildHasher, A: Clone + Al
if self.kv_heap_size.abs_diff(self.last_reported_size_bytes)
> REPORT_SIZE_EVERY_N_KB_CHANGE << 10
{
if let Some(metrics) = self.memory_usage_metrics.as_ref() {
metrics.set(self.kv_heap_size as _);
}
self.memory_usage_metrics.set(self.kv_heap_size as _);
self.last_reported_size_bytes = self.kv_heap_size;
true
} else {
Expand All @@ -232,15 +229,9 @@ impl<K: Hash + Eq + EstimateSize, V: EstimateSize, S: BuildHasher, A: Clone + Al

pub fn new_unbounded<K: Hash + Eq + EstimateSize, V: EstimateSize>(
watermark_epoch: Arc<AtomicU64>,
) -> ManagedLruCache<K, V> {
ManagedLruCache::new_inner(LruCache::unbounded(), watermark_epoch, None)
}

pub fn new_unbounded_with_metrics<K: Hash + Eq + EstimateSize, V: EstimateSize>(
watermark_epoch: Arc<AtomicU64>,
metrics_info: MetricsInfo,
) -> ManagedLruCache<K, V> {
ManagedLruCache::new_inner(LruCache::unbounded(), watermark_epoch, Some(metrics_info))
ManagedLruCache::new_inner(LruCache::unbounded(), watermark_epoch, metrics_info)
}

pub fn new_with_hasher_in<
Expand All @@ -257,7 +248,7 @@ pub fn new_with_hasher_in<
ManagedLruCache::new_inner(
LruCache::unbounded_with_hasher_in(hasher, alloc),
watermark_epoch,
Some(metrics_info),
metrics_info,
)
}

Expand All @@ -269,7 +260,7 @@ pub fn new_with_hasher<K: Hash + Eq + EstimateSize, V: EstimateSize, S: BuildHas
ManagedLruCache::new_inner(
LruCache::unbounded_with_hasher(hasher),
watermark_epoch,
Some(metrics_info),
metrics_info,
)
}

Expand All @@ -280,15 +271,15 @@ pub struct MutGuard<'a, V: EstimateSize> {
// The total size of a collection
total_size: &'a mut usize,
last_reported_size_bytes: &'a mut usize,
memory_usage_metrics: &'a mut Option<IntGauge>,
memory_usage_metrics: &'a mut IntGauge,
}

impl<'a, V: EstimateSize> MutGuard<'a, V> {
pub fn new(
inner: &'a mut V,
total_size: &'a mut usize,
last_reported_size_bytes: &'a mut usize,
memory_usage_metrics: &'a mut Option<IntGauge>,
memory_usage_metrics: &'a mut IntGauge,
) -> Self {
let original_val_size = inner.estimated_size();
Self {
Expand All @@ -304,9 +295,7 @@ impl<'a, V: EstimateSize> MutGuard<'a, V> {
if self.total_size.abs_diff(*self.last_reported_size_bytes)
> REPORT_SIZE_EVERY_N_KB_CHANGE << 10
{
if let Some(metrics) = self.memory_usage_metrics.as_ref() {
metrics.set(*self.total_size as _);
}
self.memory_usage_metrics.set(*self.total_size as _);
*self.last_reported_size_bytes = *self.total_size;
true
} else {
Expand Down Expand Up @@ -346,15 +335,15 @@ pub struct UnsafeMutGuard<V: EstimateSize> {
// The total size of a collection
total_size: NonNull<usize>,
last_reported_size_bytes: NonNull<usize>,
memory_usage_metrics: NonNull<Option<IntGauge>>,
memory_usage_metrics: NonNull<IntGauge>,
}

impl<V: EstimateSize> UnsafeMutGuard<V> {
pub fn new(
inner: &mut V,
total_size: &mut usize,
last_reported_size_bytes: &mut usize,
memory_usage_metrics: &mut Option<IntGauge>,
memory_usage_metrics: &mut IntGauge,
) -> Self {
let original_val_size = inner.estimated_size();
Self {
Expand Down
4 changes: 2 additions & 2 deletions src/stream/src/executor/aggregation/distinct.rs
Original file line number Diff line number Diff line change
Expand Up @@ -26,7 +26,7 @@ use risingwave_common::util::iter_util::ZipEqFast;
use risingwave_storage::StateStore;

use super::{AggCall, GroupKey};
use crate::cache::{new_unbounded_with_metrics, ManagedLruCache};
use crate::cache::{new_unbounded, ManagedLruCache};
use crate::common::metrics::MetricsInfo;
use crate::common::table::state_table::StateTable;
use crate::executor::monitor::StreamingMetrics;
Expand All @@ -45,7 +45,7 @@ struct ColumnDeduplicater<S: StateStore> {
impl<S: StateStore> ColumnDeduplicater<S> {
fn new(watermark_epoch: &Arc<AtomicU64>, metrics_info: MetricsInfo) -> Self {
Self {
cache: new_unbounded_with_metrics(watermark_epoch.clone(), metrics_info.clone()),
cache: new_unbounded(watermark_epoch.clone(), metrics_info.clone()),
metrics_info,
_phantom: PhantomData,
}
Expand Down
10 changes: 9 additions & 1 deletion src/stream/src/executor/dedup/append_only_dedup.rs
Original file line number Diff line number Diff line change
Expand Up @@ -12,6 +12,8 @@
// See the License for the specific language governing permissions and
// limitations under the License.

use std::sync::Arc;

use futures::{stream, StreamExt};
use futures_async_stream::try_stream;
use itertools::Itertools;
Expand All @@ -22,8 +24,10 @@ use risingwave_common::row::{OwnedRow, Row, RowExt};
use risingwave_storage::StateStore;

use super::cache::DedupCache;
use crate::common::metrics::MetricsInfo;
use crate::common::table::state_table::StateTable;
use crate::executor::error::StreamExecutorError;
use crate::executor::monitor::StreamingMetrics;
use crate::executor::{
expect_first_barrier, ActorContextRef, BoxedExecutor, BoxedMessageStream, Executor, Message,
PkIndices, PkIndicesRef, StreamExecutorResult,
Expand Down Expand Up @@ -51,12 +55,15 @@ impl<S: StateStore> AppendOnlyDedupExecutor<S> {
executor_id: u64,
ctx: ActorContextRef,
watermark_epoch: AtomicU64Ref,
metrics: Arc<StreamingMetrics>,
) -> Self {
let schema = input.schema().clone();
let metrics_info =
MetricsInfo::new(metrics, state_table.table_id(), ctx.id, "AppendOnly Dedup");
Self {
input: Some(input),
state_table,
cache: DedupCache::new(watermark_epoch),
cache: DedupCache::new(watermark_epoch, metrics_info),
pk_indices,
identity: format!("AppendOnlyDedupExecutor {:X}", executor_id),
schema,
Expand Down Expand Up @@ -259,6 +266,7 @@ mod tests {
1,
ActorContext::create(123),
Arc::new(AtomicU64::new(0)),
Arc::new(StreamingMetrics::unused()),
))
.execute();

Expand Down
8 changes: 5 additions & 3 deletions src/stream/src/executor/dedup/cache.rs
Original file line number Diff line number Diff line change
Expand Up @@ -17,6 +17,7 @@ use std::hash::Hash;
use risingwave_common::estimate_size::EstimateSize;

use crate::cache::{new_unbounded, ManagedLruCache};
use crate::common::metrics::MetricsInfo;
use crate::task::AtomicU64Ref;

/// [`DedupCache`] is used for key deduplication. Currently, the cache behaves like a set that only
Expand All @@ -26,8 +27,8 @@ pub struct DedupCache<K: Hash + Eq + EstimateSize> {
}

impl<K: Hash + Eq + EstimateSize> DedupCache<K> {
pub fn new(watermark_epoch: AtomicU64Ref) -> Self {
let cache = new_unbounded(watermark_epoch);
pub fn new(watermark_epoch: AtomicU64Ref, metrics_info: MetricsInfo) -> Self {
let cache = new_unbounded(watermark_epoch, metrics_info);
Self { inner: cache }
}

Expand Down Expand Up @@ -69,10 +70,11 @@ mod tests {
use std::sync::Arc;

use super::DedupCache;
use crate::common::metrics::MetricsInfo;

#[test]
fn test_dedup_cache() {
let mut cache = DedupCache::new(Arc::new(AtomicU64::new(10000)));
let mut cache = DedupCache::new(Arc::new(AtomicU64::new(10000)), MetricsInfo::for_test());

cache.insert(10);
assert!(cache.contains(&10));
Expand Down
5 changes: 3 additions & 2 deletions src/stream/src/executor/lookup/cache.rs
Original file line number Diff line number Diff line change
Expand Up @@ -19,6 +19,7 @@ use risingwave_common::estimate_size::{EstimateSize, KvSize, VecWithKvSize};
use risingwave_common::row::{OwnedRow, Row, RowExt};

use crate::cache::{new_unbounded, ManagedLruCache};
use crate::common::metrics::MetricsInfo;
use crate::task::AtomicU64Ref;

/// A cache for lookup's arrangement side.
Expand Down Expand Up @@ -73,8 +74,8 @@ impl LookupCache {
self.data.clear();
}

pub fn new(watermark_epoch: AtomicU64Ref) -> Self {
let cache = new_unbounded(watermark_epoch);
pub fn new(watermark_epoch: AtomicU64Ref, metrics_info: MetricsInfo) -> Self {
let cache = new_unbounded(watermark_epoch, metrics_info);
Self { data: cache }
}
}
Expand Down
10 changes: 9 additions & 1 deletion src/stream/src/executor/lookup/impl_.rs
Original file line number Diff line number Diff line change
Expand Up @@ -30,6 +30,7 @@ use risingwave_storage::StateStore;

use super::sides::{stream_lookup_arrange_prev_epoch, stream_lookup_arrange_this_epoch};
use crate::cache::cache_may_stale;
use crate::common::metrics::MetricsInfo;
use crate::common::StreamChunkBuilder;
use crate::executor::error::{StreamExecutorError, StreamExecutorResult};
use crate::executor::lookup::cache::LookupCache;
Expand Down Expand Up @@ -206,6 +207,13 @@ impl<S: StateStore> LookupExecutor<S> {
"mismatched output schema"
);

let metrics_info = MetricsInfo::new(
ctx.streaming_metrics.clone(),
storage_table.table_id().table_id(),
ctx.id,
"Lookup",
);

Self {
ctx,
chunk_data_types,
Expand All @@ -230,7 +238,7 @@ impl<S: StateStore> LookupExecutor<S> {
},
column_mapping,
key_indices_mapping,
lookup_cache: LookupCache::new(watermark_epoch),
lookup_cache: LookupCache::new(watermark_epoch, metrics_info),
chunk_size,
}
}
Expand Down
42 changes: 16 additions & 26 deletions src/stream/src/executor/mview/materialize.rs
Original file line number Diff line number Diff line change
Expand Up @@ -38,6 +38,7 @@ use risingwave_storage::mem_table::KeyOp;
use risingwave_storage::StateStore;

use crate::cache::{new_unbounded, ManagedLruCache};
use crate::common::metrics::MetricsInfo;
use crate::common::table::state_table::StateTableInner;
use crate::executor::error::StreamExecutorError;
use crate::executor::monitor::StreamingMetrics;
Expand Down Expand Up @@ -97,8 +98,9 @@ impl<S: StateStore, SD: ValueRowSerde> MaterializeExecutor<S, SD> {
StateTableInner::from_table_catalog(table_catalog, store, vnodes).await
};

let actor_id = actor_context.id;
let table_id = table_catalog.id;
let metrics_info =
MetricsInfo::new(metrics, table_catalog.id, actor_context.id, "Materialize");

Self {
input,
state_table,
Expand All @@ -109,7 +111,7 @@ impl<S: StateStore, SD: ValueRowSerde> MaterializeExecutor<S, SD> {
pk_indices: arrange_columns,
identity: format!("MaterializeExecutor {:X}", executor_id),
},
materialize_cache: MaterializeCache::new(watermark_epoch, metrics, actor_id, table_id),
materialize_cache: MaterializeCache::new(watermark_epoch, metrics_info),
conflict_behavior,
}
}
Expand Down Expand Up @@ -235,12 +237,7 @@ impl<S: StateStore> MaterializeExecutor<S, BasicSerde> {
pk_indices: arrange_columns,
identity: format!("MaterializeExecutor {:X}", executor_id),
},
materialize_cache: MaterializeCache::new(
watermark_epoch,
Arc::new(StreamingMetrics::unused()),
0,
0,
),
materialize_cache: MaterializeCache::new(watermark_epoch, MetricsInfo::for_test()),
conflict_behavior,
}
}
Expand Down Expand Up @@ -436,10 +433,8 @@ impl<S: StateStore, SD: ValueRowSerde> std::fmt::Debug for MaterializeExecutor<S
/// A cache for materialize executors.
pub struct MaterializeCache<SD> {
data: ManagedLruCache<Vec<u8>, CacheValue>,
metrics_info: MetricsInfo,
_serde: PhantomData<SD>,
metrics: Arc<StreamingMetrics>,
actor_id: String,
table_id: String,
}

#[derive(EnumAsInner, EstimateSize)]
Expand All @@ -451,19 +446,12 @@ pub enum CacheValue {
type EmptyValue = ();

impl<SD: ValueRowSerde> MaterializeCache<SD> {
pub fn new(
watermark_epoch: AtomicU64Ref,
metrics: Arc<StreamingMetrics>,
actor_id: u32,
table_id: u32,
) -> Self {
let cache = new_unbounded(watermark_epoch);
pub fn new(watermark_epoch: AtomicU64Ref, metrics_info: MetricsInfo) -> Self {
let cache = new_unbounded(watermark_epoch, metrics_info.clone());
Self {
data: cache,
metrics_info,
_serde: PhantomData,
metrics,
actor_id: actor_id.to_string(),
table_id: table_id.to_string(),
}
}

Expand Down Expand Up @@ -604,14 +592,16 @@ impl<SD: ValueRowSerde> MaterializeCache<SD> {
) -> StreamExecutorResult<()> {
let mut futures = vec![];
for key in keys {
self.metrics
self.metrics_info
.metrics
.materialize_cache_total_count
.with_label_values(&[&self.table_id, &self.actor_id])
.with_label_values(&[&self.metrics_info.table_id, &self.metrics_info.actor_id])
.inc();
if self.data.contains(key) {
self.metrics
self.metrics_info
.metrics
.materialize_cache_hit_count
.with_label_values(&[&self.table_id, &self.actor_id])
.with_label_values(&[&self.metrics_info.table_id, &self.metrics_info.actor_id])
.inc();
continue;
}
Expand Down
Loading