Skip to content

revert: feat(streaming): call may_exist when insert cache miss in join executor (#7957) #8655

New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Merged
merged 2 commits into from
Mar 22, 2023
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
6 changes: 1 addition & 5 deletions grafana/risingwave-dashboard.dashboard.py
Original file line number Diff line number Diff line change
Expand Up @@ -1378,11 +1378,7 @@ def section_streaming_actors(outer_panels):
),
panels.target(
f"rate({metric('stream_join_insert_cache_miss_count')}[$__rate_interval])",
"cache miss when insert {{actor_id}} {{side}}",
),
panels.target(
f"rate({metric('stream_join_may_exist_true_count')}[$__rate_interval])",
"may_exist true when insert {{actor_id}} {{side}}",
"cache miss when insert{{actor_id}} {{side}}",
),
],
),
Expand Down
2 changes: 1 addition & 1 deletion grafana/risingwave-dashboard.json

Large diffs are not rendered by default.

30 changes: 1 addition & 29 deletions src/stream/src/common/table/state_table.rs
Original file line number Diff line number Diff line change
Expand Up @@ -315,31 +315,6 @@ where
Distribution::fallback(),
None,
false,
0,
)
.await
}

/// Create a state table without distribution, with given `prefix_hint_len`, used for unit
/// tests.
pub async fn new_without_distribution_with_prefix_hint_len(
store: S,
table_id: TableId,
columns: Vec<ColumnDesc>,
order_types: Vec<OrderType>,
pk_indices: Vec<usize>,
prefix_hint_len: usize,
) -> Self {
Self::new_with_distribution_inner(
store,
table_id,
columns,
order_types,
pk_indices,
Distribution::fallback(),
None,
true,
prefix_hint_len,
)
.await
}
Expand All @@ -364,7 +339,6 @@ where
distribution,
value_indices,
true,
0,
)
.await
}
Expand All @@ -387,7 +361,6 @@ where
distribution,
value_indices,
false,
0,
)
.await
}
Expand All @@ -405,7 +378,6 @@ where
}: Distribution,
value_indices: Option<Vec<usize>>,
is_consistent_op: bool,
prefix_hint_len: usize,
) -> Self {
let local_state_store = store
.new_local(NewLocalOptions {
Expand Down Expand Up @@ -446,7 +418,7 @@ where
row_serde: SD::new(&column_ids, Arc::from(data_types.into_boxed_slice())),
pk_indices,
dist_key_in_pk_indices,
prefix_hint_len,
prefix_hint_len: 0,
vnodes,
table_option: Default::default(),
vnode_col_idx_in_pk: None,
Expand Down
18 changes: 5 additions & 13 deletions src/stream/src/executor/hash_join.rs
Original file line number Diff line number Diff line change
Expand Up @@ -1047,20 +1047,18 @@ mod tests {
order_types: &[OrderType],
pk_indices: &[usize],
table_id: u32,
prefix_hint_len: usize,
) -> (StateTable<MemoryStateStore>, StateTable<MemoryStateStore>) {
let column_descs = data_types
.iter()
.enumerate()
.map(|(id, data_type)| ColumnDesc::unnamed(ColumnId::new(id as i32), data_type.clone()))
.collect_vec();
let state_table = StateTable::new_without_distribution_with_prefix_hint_len(
let state_table = StateTable::new_without_distribution(
mem_state.clone(),
TableId::new(table_id),
column_descs,
order_types.to_vec(),
pk_indices.to_vec(),
prefix_hint_len,
)
.await;

Expand Down Expand Up @@ -1111,9 +1109,8 @@ mod tests {
};
let (tx_l, source_l) = MockSource::channel(schema.clone(), vec![1]);
let (tx_r, source_r) = MockSource::channel(schema, vec![1]);
let join_key_indices = vec![0];
let params_l = JoinParams::new(join_key_indices.clone(), vec![1]);
let params_r = JoinParams::new(join_key_indices.clone(), vec![1]);
let params_l = JoinParams::new(vec![0], vec![1]);
let params_r = JoinParams::new(vec![0], vec![1]);
let cond = with_condition.then(create_cond);

let mem_state = MemoryStateStore::new();
Expand All @@ -1124,7 +1121,6 @@ mod tests {
&[OrderType::ascending(), OrderType::ascending()],
&[0, 1],
0,
join_key_indices.len(),
)
.await;

Expand All @@ -1134,7 +1130,6 @@ mod tests {
&[OrderType::ascending(), OrderType::ascending()],
&[0, 1],
2,
join_key_indices.len(),
)
.await;

Expand Down Expand Up @@ -1180,9 +1175,8 @@ mod tests {
};
let (tx_l, source_l) = MockSource::channel(schema.clone(), vec![0]);
let (tx_r, source_r) = MockSource::channel(schema, vec![0]);
let join_key_indices = vec![0, 1];
let params_l = JoinParams::new(join_key_indices.clone(), vec![]);
let params_r = JoinParams::new(join_key_indices.clone(), vec![]);
let params_l = JoinParams::new(vec![0, 1], vec![]);
let params_r = JoinParams::new(vec![0, 1], vec![]);
let cond = with_condition.then(create_cond);

let mem_state = MemoryStateStore::new();
Expand All @@ -1197,7 +1191,6 @@ mod tests {
],
&[0, 1, 0],
0,
join_key_indices.len(),
)
.await;

Expand All @@ -1211,7 +1204,6 @@ mod tests {
],
&[0, 1, 1],
0,
join_key_indices.len(),
)
.await;
let schema_len = match T {
Expand Down
34 changes: 3 additions & 31 deletions src/stream/src/executor/managed_state/join/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -161,7 +161,6 @@ pub struct JoinHashMapMetrics {
total_lookup_count: usize,
/// How many times have we miss the cache when insert row
insert_cache_miss_count: usize,
may_exist_true_count: usize,
}

impl JoinHashMapMetrics {
Expand All @@ -173,7 +172,6 @@ impl JoinHashMapMetrics {
lookup_miss_count: 0,
total_lookup_count: 0,
insert_cache_miss_count: 0,
may_exist_true_count: 0,
}
}

Expand All @@ -190,14 +188,9 @@ impl JoinHashMapMetrics {
.join_insert_cache_miss_count
.with_label_values(&[&self.actor_id, self.side])
.inc_by(self.insert_cache_miss_count as u64);
self.metrics
.join_may_exist_true_count
.with_label_values(&[&self.actor_id, self.side])
.inc_by(self.may_exist_true_count as u64);
self.total_lookup_count = 0;
self.lookup_miss_count = 0;
self.insert_cache_miss_count = 0;
self.may_exist_true_count = 0;
}
}

Expand Down Expand Up @@ -435,22 +428,11 @@ impl<K: HashKey, S: StateStore> JoinHashMap<K, S> {
// Update cache
entry.insert(pk, value.encode());
} else if self.pk_contained_in_jk {
// Refill cache when the join key contains primary key.
// Refill cache when the join key exist in neither cache or storage.
self.metrics.insert_cache_miss_count += 1;
let mut state = JoinEntryState::default();
state.insert(pk, value.encode());
self.update_state(key, state.into());
} else {
let prefix = key.deserialize(&self.join_key_data_types)?;
self.metrics.insert_cache_miss_count += 1;
// Refill cache when the join key exists in neither cache or storage.
if !self.state.table.may_exist(&prefix).await? {
let mut state = JoinEntryState::default();
state.insert(pk, value.encode());
self.update_state(key, state.into());
} else {
self.metrics.may_exist_true_count += 1;
}
}

// Update the flush buffer.
Expand All @@ -462,6 +444,7 @@ impl<K: HashKey, S: StateStore> JoinHashMap<K, S> {

/// Insert a row.
/// Used when the side does not need to update degree.
#[allow(clippy::unused_async)]
pub async fn insert_row(&mut self, key: &K, value: impl Row) -> StreamExecutorResult<()> {
let join_row = JoinRow::new(&value, 0);
let pk = (&value)
Expand All @@ -471,22 +454,11 @@ impl<K: HashKey, S: StateStore> JoinHashMap<K, S> {
// Update cache
entry.insert(pk, join_row.encode());
} else if self.pk_contained_in_jk {
// Refill cache when the join key contains primary key.
// Refill cache when the join key exist in neither cache or storage.
self.metrics.insert_cache_miss_count += 1;
let mut state = JoinEntryState::default();
state.insert(pk, join_row.encode());
self.update_state(key, state.into());
} else {
let prefix = key.deserialize(&self.join_key_data_types)?;
self.metrics.insert_cache_miss_count += 1;
// Refill cache when the join key exists in neither cache or storage.
if !self.state.table.may_exist(&prefix).await? {
let mut state = JoinEntryState::default();
state.insert(pk, join_row.encode());
self.update_state(key, state.into());
} else {
self.metrics.may_exist_true_count += 1;
}
}

// Update the flush buffer.
Expand Down
16 changes: 3 additions & 13 deletions src/stream/src/executor/monitor/streaming_stats.rs
Original file line number Diff line number Diff line change
Expand Up @@ -53,7 +53,6 @@ pub struct StreamingMetrics {
pub join_lookup_miss_count: GenericCounterVec<AtomicU64>,
pub join_total_lookup_count: GenericCounterVec<AtomicU64>,
pub join_insert_cache_miss_count: GenericCounterVec<AtomicU64>,
pub join_may_exist_true_count: GenericCounterVec<AtomicU64>,
pub join_actor_input_waiting_duration_ns: GenericCounterVec<AtomicU64>,
pub join_match_duration_ns: GenericCounterVec<AtomicU64>,
pub join_barrier_align_duration: HistogramVec,
Expand Down Expand Up @@ -275,31 +274,23 @@ impl StreamingMetrics {

let join_lookup_miss_count = register_int_counter_vec_with_registry!(
"stream_join_lookup_miss_count",
"Join executor lookup miss count",
"Join executor lookup miss duration",
&["actor_id", "side"],
registry
)
.unwrap();

let join_total_lookup_count = register_int_counter_vec_with_registry!(
"stream_join_lookup_total_count",
"Join executor lookup total count",
"Join executor lookup total operation",
&["actor_id", "side"],
registry
)
.unwrap();

let join_insert_cache_miss_count = register_int_counter_vec_with_registry!(
"stream_join_insert_cache_miss_count",
"Count of cache miss when insert rows in join executor",
&["actor_id", "side"],
registry
)
.unwrap();

let join_may_exist_true_count = register_int_counter_vec_with_registry!(
"stream_join_may_exist_true_count",
"Count of may_exist's true returns of when insert rows in join executor",
"Join executor cache miss when insert operation",
&["actor_id", "side"],
registry
)
Expand Down Expand Up @@ -486,7 +477,6 @@ impl StreamingMetrics {
join_lookup_miss_count,
join_total_lookup_count,
join_insert_cache_miss_count,
join_may_exist_true_count,
join_actor_input_waiting_duration_ns,
join_match_duration_ns,
join_barrier_align_duration,
Expand Down