Skip to content

Commit 52caa65

Browse files
authored
revert: feat(streaming): call may_exist when insert cache miss in join executor (risingwavelabs#7957) (risingwavelabs#8655)
1 parent 86ffe99 commit 52caa65

File tree

6 files changed

+14
-92
lines changed

6 files changed

+14
-92
lines changed

grafana/risingwave-dashboard.dashboard.py

Lines changed: 1 addition & 5 deletions
Original file line numberDiff line numberDiff line change
@@ -1378,11 +1378,7 @@ def section_streaming_actors(outer_panels):
13781378
),
13791379
panels.target(
13801380
f"rate({metric('stream_join_insert_cache_miss_count')}[$__rate_interval])",
1381-
"cache miss when insert {{actor_id}} {{side}}",
1382-
),
1383-
panels.target(
1384-
f"rate({metric('stream_join_may_exist_true_count')}[$__rate_interval])",
1385-
"may_exist true when insert {{actor_id}} {{side}}",
1381+
"cache miss when insert{{actor_id}} {{side}}",
13861382
),
13871383
],
13881384
),

grafana/risingwave-dashboard.json

Lines changed: 1 addition & 1 deletion
Some generated files are not rendered by default. Learn more about customizing how changed files appear on GitHub.

src/stream/src/common/table/state_table.rs

Lines changed: 1 addition & 29 deletions
Original file line numberDiff line numberDiff line change
@@ -315,31 +315,6 @@ where
315315
Distribution::fallback(),
316316
None,
317317
false,
318-
0,
319-
)
320-
.await
321-
}
322-
323-
/// Create a state table without distribution, with given `prefix_hint_len`, used for unit
324-
/// tests.
325-
pub async fn new_without_distribution_with_prefix_hint_len(
326-
store: S,
327-
table_id: TableId,
328-
columns: Vec<ColumnDesc>,
329-
order_types: Vec<OrderType>,
330-
pk_indices: Vec<usize>,
331-
prefix_hint_len: usize,
332-
) -> Self {
333-
Self::new_with_distribution_inner(
334-
store,
335-
table_id,
336-
columns,
337-
order_types,
338-
pk_indices,
339-
Distribution::fallback(),
340-
None,
341-
true,
342-
prefix_hint_len,
343318
)
344319
.await
345320
}
@@ -364,7 +339,6 @@ where
364339
distribution,
365340
value_indices,
366341
true,
367-
0,
368342
)
369343
.await
370344
}
@@ -387,7 +361,6 @@ where
387361
distribution,
388362
value_indices,
389363
false,
390-
0,
391364
)
392365
.await
393366
}
@@ -405,7 +378,6 @@ where
405378
}: Distribution,
406379
value_indices: Option<Vec<usize>>,
407380
is_consistent_op: bool,
408-
prefix_hint_len: usize,
409381
) -> Self {
410382
let local_state_store = store
411383
.new_local(NewLocalOptions {
@@ -446,7 +418,7 @@ where
446418
row_serde: SD::new(&column_ids, Arc::from(data_types.into_boxed_slice())),
447419
pk_indices,
448420
dist_key_in_pk_indices,
449-
prefix_hint_len,
421+
prefix_hint_len: 0,
450422
vnodes,
451423
table_option: Default::default(),
452424
vnode_col_idx_in_pk: None,

src/stream/src/executor/hash_join.rs

Lines changed: 5 additions & 13 deletions
Original file line numberDiff line numberDiff line change
@@ -1047,20 +1047,18 @@ mod tests {
10471047
order_types: &[OrderType],
10481048
pk_indices: &[usize],
10491049
table_id: u32,
1050-
prefix_hint_len: usize,
10511050
) -> (StateTable<MemoryStateStore>, StateTable<MemoryStateStore>) {
10521051
let column_descs = data_types
10531052
.iter()
10541053
.enumerate()
10551054
.map(|(id, data_type)| ColumnDesc::unnamed(ColumnId::new(id as i32), data_type.clone()))
10561055
.collect_vec();
1057-
let state_table = StateTable::new_without_distribution_with_prefix_hint_len(
1056+
let state_table = StateTable::new_without_distribution(
10581057
mem_state.clone(),
10591058
TableId::new(table_id),
10601059
column_descs,
10611060
order_types.to_vec(),
10621061
pk_indices.to_vec(),
1063-
prefix_hint_len,
10641062
)
10651063
.await;
10661064

@@ -1111,9 +1109,8 @@ mod tests {
11111109
};
11121110
let (tx_l, source_l) = MockSource::channel(schema.clone(), vec![1]);
11131111
let (tx_r, source_r) = MockSource::channel(schema, vec![1]);
1114-
let join_key_indices = vec![0];
1115-
let params_l = JoinParams::new(join_key_indices.clone(), vec![1]);
1116-
let params_r = JoinParams::new(join_key_indices.clone(), vec![1]);
1112+
let params_l = JoinParams::new(vec![0], vec![1]);
1113+
let params_r = JoinParams::new(vec![0], vec![1]);
11171114
let cond = with_condition.then(create_cond);
11181115

11191116
let mem_state = MemoryStateStore::new();
@@ -1124,7 +1121,6 @@ mod tests {
11241121
&[OrderType::ascending(), OrderType::ascending()],
11251122
&[0, 1],
11261123
0,
1127-
join_key_indices.len(),
11281124
)
11291125
.await;
11301126

@@ -1134,7 +1130,6 @@ mod tests {
11341130
&[OrderType::ascending(), OrderType::ascending()],
11351131
&[0, 1],
11361132
2,
1137-
join_key_indices.len(),
11381133
)
11391134
.await;
11401135

@@ -1180,9 +1175,8 @@ mod tests {
11801175
};
11811176
let (tx_l, source_l) = MockSource::channel(schema.clone(), vec![0]);
11821177
let (tx_r, source_r) = MockSource::channel(schema, vec![0]);
1183-
let join_key_indices = vec![0, 1];
1184-
let params_l = JoinParams::new(join_key_indices.clone(), vec![]);
1185-
let params_r = JoinParams::new(join_key_indices.clone(), vec![]);
1178+
let params_l = JoinParams::new(vec![0, 1], vec![]);
1179+
let params_r = JoinParams::new(vec![0, 1], vec![]);
11861180
let cond = with_condition.then(create_cond);
11871181

11881182
let mem_state = MemoryStateStore::new();
@@ -1197,7 +1191,6 @@ mod tests {
11971191
],
11981192
&[0, 1, 0],
11991193
0,
1200-
join_key_indices.len(),
12011194
)
12021195
.await;
12031196

@@ -1211,7 +1204,6 @@ mod tests {
12111204
],
12121205
&[0, 1, 1],
12131206
0,
1214-
join_key_indices.len(),
12151207
)
12161208
.await;
12171209
let schema_len = match T {

src/stream/src/executor/managed_state/join/mod.rs

Lines changed: 3 additions & 31 deletions
Original file line numberDiff line numberDiff line change
@@ -161,7 +161,6 @@ pub struct JoinHashMapMetrics {
161161
total_lookup_count: usize,
162162
/// How many times have we miss the cache when insert row
163163
insert_cache_miss_count: usize,
164-
may_exist_true_count: usize,
165164
}
166165

167166
impl JoinHashMapMetrics {
@@ -173,7 +172,6 @@ impl JoinHashMapMetrics {
173172
lookup_miss_count: 0,
174173
total_lookup_count: 0,
175174
insert_cache_miss_count: 0,
176-
may_exist_true_count: 0,
177175
}
178176
}
179177

@@ -190,14 +188,9 @@ impl JoinHashMapMetrics {
190188
.join_insert_cache_miss_count
191189
.with_label_values(&[&self.actor_id, self.side])
192190
.inc_by(self.insert_cache_miss_count as u64);
193-
self.metrics
194-
.join_may_exist_true_count
195-
.with_label_values(&[&self.actor_id, self.side])
196-
.inc_by(self.may_exist_true_count as u64);
197191
self.total_lookup_count = 0;
198192
self.lookup_miss_count = 0;
199193
self.insert_cache_miss_count = 0;
200-
self.may_exist_true_count = 0;
201194
}
202195
}
203196

@@ -435,22 +428,11 @@ impl<K: HashKey, S: StateStore> JoinHashMap<K, S> {
435428
// Update cache
436429
entry.insert(pk, value.encode());
437430
} else if self.pk_contained_in_jk {
438-
// Refill cache when the join key contains primary key.
431+
// Refill cache when the join key exist in neither cache or storage.
439432
self.metrics.insert_cache_miss_count += 1;
440433
let mut state = JoinEntryState::default();
441434
state.insert(pk, value.encode());
442435
self.update_state(key, state.into());
443-
} else {
444-
let prefix = key.deserialize(&self.join_key_data_types)?;
445-
self.metrics.insert_cache_miss_count += 1;
446-
// Refill cache when the join key exists in neither cache or storage.
447-
if !self.state.table.may_exist(&prefix).await? {
448-
let mut state = JoinEntryState::default();
449-
state.insert(pk, value.encode());
450-
self.update_state(key, state.into());
451-
} else {
452-
self.metrics.may_exist_true_count += 1;
453-
}
454436
}
455437

456438
// Update the flush buffer.
@@ -462,6 +444,7 @@ impl<K: HashKey, S: StateStore> JoinHashMap<K, S> {
462444

463445
/// Insert a row.
464446
/// Used when the side does not need to update degree.
447+
#[allow(clippy::unused_async)]
465448
pub async fn insert_row(&mut self, key: &K, value: impl Row) -> StreamExecutorResult<()> {
466449
let join_row = JoinRow::new(&value, 0);
467450
let pk = (&value)
@@ -471,22 +454,11 @@ impl<K: HashKey, S: StateStore> JoinHashMap<K, S> {
471454
// Update cache
472455
entry.insert(pk, join_row.encode());
473456
} else if self.pk_contained_in_jk {
474-
// Refill cache when the join key contains primary key.
457+
// Refill cache when the join key exist in neither cache or storage.
475458
self.metrics.insert_cache_miss_count += 1;
476459
let mut state = JoinEntryState::default();
477460
state.insert(pk, join_row.encode());
478461
self.update_state(key, state.into());
479-
} else {
480-
let prefix = key.deserialize(&self.join_key_data_types)?;
481-
self.metrics.insert_cache_miss_count += 1;
482-
// Refill cache when the join key exists in neither cache or storage.
483-
if !self.state.table.may_exist(&prefix).await? {
484-
let mut state = JoinEntryState::default();
485-
state.insert(pk, join_row.encode());
486-
self.update_state(key, state.into());
487-
} else {
488-
self.metrics.may_exist_true_count += 1;
489-
}
490462
}
491463

492464
// Update the flush buffer.

src/stream/src/executor/monitor/streaming_stats.rs

Lines changed: 3 additions & 13 deletions
Original file line numberDiff line numberDiff line change
@@ -53,7 +53,6 @@ pub struct StreamingMetrics {
5353
pub join_lookup_miss_count: GenericCounterVec<AtomicU64>,
5454
pub join_total_lookup_count: GenericCounterVec<AtomicU64>,
5555
pub join_insert_cache_miss_count: GenericCounterVec<AtomicU64>,
56-
pub join_may_exist_true_count: GenericCounterVec<AtomicU64>,
5756
pub join_actor_input_waiting_duration_ns: GenericCounterVec<AtomicU64>,
5857
pub join_match_duration_ns: GenericCounterVec<AtomicU64>,
5958
pub join_barrier_align_duration: HistogramVec,
@@ -275,31 +274,23 @@ impl StreamingMetrics {
275274

276275
let join_lookup_miss_count = register_int_counter_vec_with_registry!(
277276
"stream_join_lookup_miss_count",
278-
"Join executor lookup miss count",
277+
"Join executor lookup miss duration",
279278
&["actor_id", "side"],
280279
registry
281280
)
282281
.unwrap();
283282

284283
let join_total_lookup_count = register_int_counter_vec_with_registry!(
285284
"stream_join_lookup_total_count",
286-
"Join executor lookup total count",
285+
"Join executor lookup total operation",
287286
&["actor_id", "side"],
288287
registry
289288
)
290289
.unwrap();
291290

292291
let join_insert_cache_miss_count = register_int_counter_vec_with_registry!(
293292
"stream_join_insert_cache_miss_count",
294-
"Count of cache miss when insert rows in join executor",
295-
&["actor_id", "side"],
296-
registry
297-
)
298-
.unwrap();
299-
300-
let join_may_exist_true_count = register_int_counter_vec_with_registry!(
301-
"stream_join_may_exist_true_count",
302-
"Count of may_exist's true returns of when insert rows in join executor",
293+
"Join executor cache miss when insert operation",
303294
&["actor_id", "side"],
304295
registry
305296
)
@@ -486,7 +477,6 @@ impl StreamingMetrics {
486477
join_lookup_miss_count,
487478
join_total_lookup_count,
488479
join_insert_cache_miss_count,
489-
join_may_exist_true_count,
490480
join_actor_input_waiting_duration_ns,
491481
join_match_duration_ns,
492482
join_barrier_align_duration,

0 commit comments

Comments
 (0)