Skip to content

Commit 8cc7bd4

Browse files
committed
remove option
1 parent e0b9df4 commit 8cc7bd4

File tree

1 file changed

+27
-32
lines changed

1 file changed

+27
-32
lines changed

src/stream/src/cache/managed_lru.rs

Lines changed: 27 additions & 32 deletions
Original file line numberDiff line numberDiff line change
@@ -39,24 +39,22 @@ pub struct ManagedLruCache<K, V, S = DefaultHasher, A: Clone + Allocator = Globa
3939
/// The heap size of keys/values
4040
kv_heap_size: usize,
4141
/// The metrics of memory usage
42-
memory_usage_metrics: Option<IntGauge>,
42+
memory_usage_metrics: IntGauge,
4343
// Metrics info
44-
metrics_info: Option<MetricsInfo>,
44+
metrics_info: MetricsInfo,
4545
/// The size reported last time
4646
last_reported_size_bytes: usize,
4747
}
4848

4949
impl<K, V, S, A: Clone + Allocator> Drop for ManagedLruCache<K, V, S, A> {
5050
fn drop(&mut self) {
51-
if let Some(metrics) = &self.memory_usage_metrics {
52-
metrics.set(0.into());
53-
}
54-
if let Some(info) = &self.metrics_info {
55-
info.metrics
56-
.stream_memory_usage
57-
.remove_label_values(&[&info.table_id, &info.actor_id, &info.desc])
58-
.unwrap();
59-
}
51+
let info = &self.metrics_info;
52+
self.memory_usage_metrics.set(0.into());
53+
54+
info.metrics
55+
.stream_memory_usage
56+
.remove_label_values(&[&info.table_id, &info.actor_id, &info.desc])
57+
.unwrap();
6058
}
6159
}
6260

@@ -66,15 +64,16 @@ impl<K: Hash + Eq + EstimateSize, V: EstimateSize, S: BuildHasher, A: Clone + Al
6664
pub fn new_inner(
6765
inner: LruCache<K, V, S, A>,
6866
watermark_epoch: Arc<AtomicU64>,
69-
metrics_info: Option<MetricsInfo>,
67+
metrics_info: MetricsInfo,
7068
) -> Self {
71-
let memory_usage_metrics = metrics_info.as_ref().map(|info| {
72-
info.metrics.stream_memory_usage.with_label_values(&[
73-
&info.table_id,
74-
&info.actor_id,
75-
&info.desc,
76-
])
77-
});
69+
let memory_usage_metrics = metrics_info
70+
.metrics
71+
.stream_memory_usage
72+
.with_label_values(&[
73+
&metrics_info.table_id,
74+
&metrics_info.actor_id,
75+
&metrics_info.desc,
76+
]);
7877

7978
Self {
8079
inner,
@@ -219,9 +218,7 @@ impl<K: Hash + Eq + EstimateSize, V: EstimateSize, S: BuildHasher, A: Clone + Al
219218
if self.kv_heap_size.abs_diff(self.last_reported_size_bytes)
220219
> REPORT_SIZE_EVERY_N_KB_CHANGE << 10
221220
{
222-
if let Some(metrics) = self.memory_usage_metrics.as_ref() {
223-
metrics.set(self.kv_heap_size as _);
224-
}
221+
self.memory_usage_metrics.set(self.kv_heap_size as _);
225222
self.last_reported_size_bytes = self.kv_heap_size;
226223
true
227224
} else {
@@ -234,7 +231,7 @@ pub fn new_unbounded<K: Hash + Eq + EstimateSize, V: EstimateSize>(
234231
watermark_epoch: Arc<AtomicU64>,
235232
metrics_info: MetricsInfo,
236233
) -> ManagedLruCache<K, V> {
237-
ManagedLruCache::new_inner(LruCache::unbounded(), watermark_epoch, Some(metrics_info))
234+
ManagedLruCache::new_inner(LruCache::unbounded(), watermark_epoch, metrics_info)
238235
}
239236

240237
pub fn new_with_hasher_in<
@@ -251,7 +248,7 @@ pub fn new_with_hasher_in<
251248
ManagedLruCache::new_inner(
252249
LruCache::unbounded_with_hasher_in(hasher, alloc),
253250
watermark_epoch,
254-
Some(metrics_info),
251+
metrics_info,
255252
)
256253
}
257254

@@ -263,7 +260,7 @@ pub fn new_with_hasher<K: Hash + Eq + EstimateSize, V: EstimateSize, S: BuildHas
263260
ManagedLruCache::new_inner(
264261
LruCache::unbounded_with_hasher(hasher),
265262
watermark_epoch,
266-
Some(metrics_info),
263+
metrics_info,
267264
)
268265
}
269266

@@ -274,15 +271,15 @@ pub struct MutGuard<'a, V: EstimateSize> {
274271
// The total size of a collection
275272
total_size: &'a mut usize,
276273
last_reported_size_bytes: &'a mut usize,
277-
memory_usage_metrics: &'a mut Option<IntGauge>,
274+
memory_usage_metrics: &'a mut IntGauge,
278275
}
279276

280277
impl<'a, V: EstimateSize> MutGuard<'a, V> {
281278
pub fn new(
282279
inner: &'a mut V,
283280
total_size: &'a mut usize,
284281
last_reported_size_bytes: &'a mut usize,
285-
memory_usage_metrics: &'a mut Option<IntGauge>,
282+
memory_usage_metrics: &'a mut IntGauge,
286283
) -> Self {
287284
let original_val_size = inner.estimated_size();
288285
Self {
@@ -298,9 +295,7 @@ impl<'a, V: EstimateSize> MutGuard<'a, V> {
298295
if self.total_size.abs_diff(*self.last_reported_size_bytes)
299296
> REPORT_SIZE_EVERY_N_KB_CHANGE << 10
300297
{
301-
if let Some(metrics) = self.memory_usage_metrics.as_ref() {
302-
metrics.set(*self.total_size as _);
303-
}
298+
self.memory_usage_metrics.set(*self.total_size as _);
304299
*self.last_reported_size_bytes = *self.total_size;
305300
true
306301
} else {
@@ -340,15 +335,15 @@ pub struct UnsafeMutGuard<V: EstimateSize> {
340335
// The total size of a collection
341336
total_size: NonNull<usize>,
342337
last_reported_size_bytes: NonNull<usize>,
343-
memory_usage_metrics: NonNull<Option<IntGauge>>,
338+
memory_usage_metrics: NonNull<IntGauge>,
344339
}
345340

346341
impl<V: EstimateSize> UnsafeMutGuard<V> {
347342
pub fn new(
348343
inner: &mut V,
349344
total_size: &mut usize,
350345
last_reported_size_bytes: &mut usize,
351-
memory_usage_metrics: &mut Option<IntGauge>,
346+
memory_usage_metrics: &mut IntGauge,
352347
) -> Self {
353348
let original_val_size = inner.estimated_size();
354349
Self {

0 commit comments

Comments
 (0)