Skip to content

feat(storage): add may_exist interface in state table #7489

New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Merged
merged 15 commits into from
Feb 14, 2023

Conversation

hzxa21
Copy link
Collaborator

@hzxa21 hzxa21 commented Jan 19, 2023

I hereby agree to the terms of the Singularity Data, Inc. Contributor License Agreement.

What's changed and what's your intention?

This PR adds the may_exist interface in both local state store and state table, which will be used by the streaming executor to check the existence of a given key range in storage efficiently. A typical use case is for hash join to check existence of a join key before refilling its operator cache on write (#7393 (comment))

Note that there can be false positive when using this interface but the false positive rate can be improved by providing a prefix_hint in ReadOptions to check SST bloom filters. In the current implementation, state table will always provide the prefix_hint in its may_exist implementation.

LocalStateStore:

    /// Check existence of a given `key_range`.
    /// It is better to provide `prefix_hint` in `read_options`, which will be used
    /// for checking bloom filter if hummock is used. If `prefix_hint` is not provided,
    /// the false positive rate can be significantly higher because bloom filter cannot
    /// be used.
    ///
    /// Returns:
    /// - false: `key_range` is guaranteed to be absent in storage.
    /// - true: `key_range` may or may not exist in storage.
    fn may_exist(
        &self,
        key_range: (Bound<Vec<u8>>, Bound<Vec<u8>>),
        read_options: ReadOptions,
    ) -> Self::MayExistFuture<'_>;

StateTable:

    /// Returns:
    /// false: the provided pk prefix is absent in state store.
    /// true: the provided pk prefix may or may not be present in state store.
    pub async fn may_exist(&self, pk_prefix: impl Row) -> StreamExecutorResult<bool> {

Checklist

  • I have written necessary rustdoc comments
  • I have added necessary unit tests and integration tests
  • I have added fuzzing tests or opened an issue to track them. (Optional, recommended for new SQL features).
  • All checks passed in ./risedev check (or alias, ./risedev c)

Documentation

If your pull request contains user-facing changes, please specify the types of the changes, and create a release note. Otherwise, please feel free to remove this section.

Types of user-facing changes

Please keep the types that apply to your changes, and remove those that do not apply.

  • Installation and deployment
  • Connector (sources & sinks)
  • SQL commands, functions, and operators
  • RisingWave cluster configuration changes
  • Other (please specify in the release note below)

Release note

Please create a release note for your changes. In the release note, focus on the impact on users, and mention the environment or conditions where the impact may occur.

Refer to a related PR or issue link (optional)

Copy link
Contributor

@Li0k Li0k left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Rest LGTM, thanks for the PR

@codecov
Copy link

codecov bot commented Jan 19, 2023

Codecov Report

Merging #7489 (5d6af51) into main (9932864) will increase coverage by 0.03%.
The diff coverage is 81.99%.

@@            Coverage Diff             @@
##             main    #7489      +/-   ##
==========================================
+ Coverage   71.73%   71.76%   +0.03%     
==========================================
  Files        1113     1113              
  Lines      177058   177450     +392     
==========================================
+ Hits       127004   127341     +337     
- Misses      50054    50109      +55     
Flag Coverage Δ
rust 71.76% <81.99%> (+0.03%) ⬆️

Flags with carried forward coverage won't be shown. Click here to find out more.

Impacted Files Coverage Δ
src/storage/src/memory.rs 91.79% <0.00%> (-1.12%) ⬇️
src/storage/src/monitor/monitored_store.rs 4.48% <0.00%> (-0.38%) ⬇️
src/storage/src/panic_store.rs 0.00% <0.00%> (ø)
src/storage/src/store.rs 69.64% <ø> (ø)
src/storage/src/store_impl.rs 9.07% <0.00%> (-0.44%) ⬇️
src/storage/src/hummock/state_store_v1.rs 67.72% <42.10%> (+0.52%) ⬆️
src/storage/src/hummock/utils.rs 89.66% <92.00%> (-0.34%) ⬇️
src/stream/src/common/table/state_table.rs 82.54% <93.02%> (+0.53%) ⬆️
...e/src/hummock/shared_buffer/shared_buffer_batch.rs 90.96% <95.31%> (+0.51%) ⬆️
src/stream/src/common/table/test_state_table.rs 98.59% <97.68%> (-0.15%) ⬇️
... and 11 more

📣 We’re building smart automated test selection to slash your CI/CD build times. Learn more

Copy link
Contributor

@wenym1 wenym1 left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Rest LGTM. Thanks for the PR!

table_id_label,
false,
);
local_stats.report(self.state_store_metrics.as_ref(), table_id_label);
Copy link
Contributor

@wenym1 wenym1 Jan 19, 2023

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Can we wrap the local_stats with a guard that stores the self.state_store_metrics and table_id_label so that we don't have to pay attention to manually reporting before returning. As long as we impl Deref and DerefMut for the guard, we can use the guard the same way we use the LocalStats.

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Maybe we can implement this feature and replace all usages in another PR.

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

For report_bloom_filter_metrics, different parameters are returned in different branches, I am not sure if it is easy to modify, we can discuss it in the next pr

Copy link
Contributor

@KeXiangWang KeXiangWang left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

LGTM

@BugenZhao
Copy link
Member

May I ask what the cost of calling this function is? IIUC, it needs to collect version info and fetch the metadata for SSTs in the range.

@Little-Wallace
Copy link
Contributor

How about rename it to may_exist ? Because we always use false value of it. For example, in most case, we use such code:

if !surely_not_have(xxx) {
    // do some read operation.
}

so why not change it to a positive value. so that we can write it with:

if may_exist(xxx) {
    // do some read.
}


pub async fn surely_not_have(
&self,
key_range: (Bound<Vec<u8>>, Bound<Vec<u8>>),
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Why we still need a key_range rather than a prefix.
I think this interface could only be used in some prefix value. We can not judge whether there exist value between some range.

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Seems that we are not only using the bloom filter to filter the key in this method. Before fetching the sst meta to get the bloom filter, we can filter by sst min-max with the key range.

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

state_store could convert prefix to a range by itself. I think we shall support parameters as less as possible to avoid a wrong variable pass by upper layer.

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

In our current code, all upper layer streaming executors are accessing storage state via StateTable, and it already provides a clean API, which only passes a pk_prefix, and it is already doing the work of converting the prefix to key range.

On the other hand, no upper layer code of streaming executor is accessing StateStore directly, and we don't have to worry about misuse from the streaming executor side. Instead, I think we should provide better flexibility in the StateStore so that in the future when we want to support filtering other than the prefix filter in StateTable, we can easily reuse the current StateStore API without modifying it.

Currently we are doing two kinds of filtering in our code, min-max and bloom filter, and they are independent to each other, and therefore it is more flexible to pass the two filtering condition (key range and bloom filter hint) in separate parameters. Passing a prefix solely will introduce the assumption that, all tables that call this method are using prefix as the bloom filter key. However, for some tables, they do not use the prefix as the bloom filter key.

Copy link
Contributor

@wcy-fdu wcy-fdu Jan 28, 2023

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

However, for some tables, they do not use the prefix as the bloom filter key.

Can you give some specific examples?

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Can you give some specific examples?

Did we previously have a plan to turn the prefix_hint into bloom_filter_hint in the ReadOptions so that we are not using a key prefix as the bloom filter key? IIRC, the demand for this feature comes from the earlier design to support watermark, which puts the timestamp column at the beginning of key encoding to support state clean by range delete, but build the bloom filter with the following distribution key.

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

For watermark issue, we finally decide to decouple bloom filter key and distribution key, and now bloom_filter_hint/bloom_filter_key is always pk_prefix, and all streaming read will hit bloom filter.

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

If we have decoupled bloom filter key and distribution key, I think we should pass the bloom filter key and the key range separately in the StateStore API 🤔. The StateTable will be responsible for calculating the bloom filter key according to the table config. Otherwise, they will be coupled again.

Copy link
Collaborator Author

@hzxa21 hzxa21 Feb 7, 2023

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

In our current code, all upper layer streaming executors are accessing storage state via StateTable, and it already provides a clean API, which only passes a pk_prefix, and it is already doing the work of converting the prefix to key range.

On the other hand, no upper layer code of streaming executor is accessing StateStore directly, and we don't have to worry about misuse from the streaming executor side. Instead, I think we should provide better flexibility in the StateStore so that in the future when we want to support filtering other than the prefix filter in StateTable, we can easily reuse the current StateStore API without modifying it.

Currently we are doing two kinds of filtering in our code, min-max and bloom filter, and they are independent to each other, and therefore it is more flexible to pass the two filtering condition (key range and bloom filter hint) in separate parameters. Passing a prefix solely will introduce the assumption that, all tables that call this method are using prefix as the bloom filter key. However, for some tables, they do not use the prefix as the bloom filter key.

This is exactly the reason why I put key_range in this new interface.

pub async fn surely_not_have(&self, pk_prefix: impl Row) -> StreamExecutorResult<bool> {
let prefix_serializer = self.pk_serde.prefix(pk_prefix.len());
let encoded_prefix = serialize_pk(&pk_prefix, &prefix_serializer);
let encoded_key_range = range_of_prefix(&encoded_prefix);
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

As I last comment, we only need to pass encoded_prefix to state_store so that we can avoid encode which cause another allocation and memory copy

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Maybe we can make the passed key range to be (Bound<Bytes>, Bound<Bytes>), and then turning a prefix into the key range will only cause shallow copy and no allocation and memcpy.

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

The performance is not the main concern point. But I think pass prefix is directly and we only support to check whether data with this prefix exists in state-store. It is clear in for the method surely_not_have .

Copy link
Contributor

@wcy-fdu wcy-fdu left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

LGTM to state table part, and I'm curious about which executors will use this interface.

FYI, in materialize executor, we do pk conflict check to constraint input pk, but this will frequently accessed storage so we maintain a cache, not sure whether this interface will optimize it.

@wenym1
Copy link
Contributor

wenym1 commented Jan 28, 2023

I'm curious about which executors will use this interface.

In hash join we can refill the operator cache without iter over the pk_prefix when we know that the key is definitely not in the storage. Without this method, if we still want to refill the operator cache, we will have to iter over the pk_prefix every time we gets a new key.

@KeXiangWang
Copy link
Contributor

Can we also add metrics for surely_not_have? For example, Read Duration - Surely_Not_Have,just like the Read Duration-Iter and Read Duration-Get metrics.

@hzxa21
Copy link
Collaborator Author

hzxa21 commented Feb 3, 2023

May I ask what the cost of calling this function is? IIUC, it needs to collect version info and fetch the metadata for SSTs in the range.

There are three contributing factors to the cost of this function:

  1. SST pruning with key range and table id. This pruning is done by checking version, which is cached in the CN.
  2. Fetch bloom filters of the pruned SSTs. This requires fetching SST meta from meta cache and remote I/O can happen if there is a cache miss. Based on the previous benchmark result done with @KeXiangWang, the meta cache miss rate is low.
  3. Hash and check bloom filter. Mainly CPU costs.

@hzxa21 hzxa21 force-pushed the patrick/storage-surely-not-have.pr branch from 8c67631 to 85869dc Compare February 7, 2023 10:17
@hzxa21 hzxa21 changed the title feat(storage): add surely_not_have interface in state table feat(storage): add may_exist interface in state table Feb 7, 2023
@hzxa21
Copy link
Collaborator Author

hzxa21 commented Feb 7, 2023

@Little-Wallace @KeXiangWang I have renamed the method to may_exist and added the duration metrics. PTAL.

Copy link
Contributor

@KeXiangWang KeXiangWang left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

There are still some surely_not_have left. We'd better make sure all of them are updated before merging. LGTM for the rest.


self.state_store_metrics
.iter_merge_sstable_counts
.with_label_values(&[table_id_label, "surely-not-have"])
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

rename to may_exist

@hzxa21 hzxa21 force-pushed the patrick/storage-surely-not-have.pr branch from 1956181 to 123d9c0 Compare February 13, 2023 07:16
Copy link
Contributor

@Little-Wallace Little-Wallace left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

LGTM

@mergify mergify bot merged commit fa34964 into main Feb 14, 2023
@mergify mergify bot deleted the patrick/storage-surely-not-have.pr branch February 14, 2023 06:05
stdrc pushed a commit that referenced this pull request Feb 14, 2023
This PR adds the `may_exist` interface in both local state store and state table, which will be used by the streaming executor to check the existence of a given key range in storage efficiently. A typical use case is for hash join to check existence of a join key before refilling its operator cache on write (#7393 (comment))

Note that there can be false positive when using this interface but the false positive rate can be improved by providing a `prefix_hint` in ReadOptions to check SST bloom filters. In the current implementation, state table will always provide the `prefix_hint` in its `may_exist` implementation.

LocalStateStore:
```rust
/// Check existence of a given `key_range`.
/// It is better to provide `prefix_hint` in `read_options`, which will be used
/// for checking bloom filter if hummock is used. If `prefix_hint` is not provided,
/// the false positive rate can be significantly higher because bloom filter cannot
/// be used.
///
/// Returns:
/// - false: `key_range` is guaranteed to be absent in storage.
/// - true: `key_range` may or may not exist in storage.
fn may_exist(
&self,
key_range: (Bound<Vec<u8>>, Bound<Vec<u8>>),
read_options: ReadOptions,
) -> Self::MayExistFuture<'_>;
```

StateTable:
```rust
/// Returns:
/// false: the provided pk prefix is absent in state store.
/// true: the provided pk prefix may or may not be present in state store.
pub async fn may_exist(&self, pk_prefix: impl Row) -> StreamExecutorResult<bool> {
```

Approved-By: Li0k
Approved-By: wenym1
Approved-By: KeXiangWang
Approved-By: wcy-fdu
Approved-By: Little-Wallace
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Labels
type/feature Type: New feature.
Projects
None yet
Development

Successfully merging this pull request may close these issues.

7 participants