-
Notifications
You must be signed in to change notification settings - Fork 646
feat(storage): add may_exist interface in state table #7489
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
Conversation
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Rest LGTM, thanks for the PR
Codecov Report
@@ Coverage Diff @@
## main #7489 +/- ##
==========================================
+ Coverage 71.73% 71.76% +0.03%
==========================================
Files 1113 1113
Lines 177058 177450 +392
==========================================
+ Hits 127004 127341 +337
- Misses 50054 50109 +55
Flags with carried forward coverage won't be shown. Click here to find out more.
📣 We’re building smart automated test selection to slash your CI/CD build times. Learn more |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Rest LGTM. Thanks for the PR!
table_id_label, | ||
false, | ||
); | ||
local_stats.report(self.state_store_metrics.as_ref(), table_id_label); |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Can we wrap the local_stats
with a guard that stores the self.state_store_metrics
and table_id_label
so that we don't have to pay attention to manually reporting before returning. As long as we impl Deref
and DerefMut
for the guard, we can use the guard the same way we use the LocalStats
.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Maybe we can implement this feature and replace all usages in another PR.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
For report_bloom_filter_metrics
, different parameters are returned in different branches, I am not sure if it is easy to modify, we can discuss it in the next pr
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
LGTM
May I ask what the cost of calling this function is? IIUC, it needs to collect version info and fetch the metadata for SSTs in the range. |
How about rename it to if !surely_not_have(xxx) {
// do some read operation.
} so why not change it to a positive value. so that we can write it with: if may_exist(xxx) {
// do some read.
} |
|
||
pub async fn surely_not_have( | ||
&self, | ||
key_range: (Bound<Vec<u8>>, Bound<Vec<u8>>), |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Why we still need a key_range
rather than a prefix.
I think this interface could only be used in some prefix value. We can not judge whether there exist value between some range.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Seems that we are not only using the bloom filter to filter the key in this method. Before fetching the sst meta to get the bloom filter, we can filter by sst min-max with the key range.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
state_store
could convert prefix
to a range
by itself. I think we shall support parameters as less as possible to avoid a wrong variable pass by upper layer.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
In our current code, all upper layer streaming executors are accessing storage state via StateTable
, and it already provides a clean API, which only passes a pk_prefix, and it is already doing the work of converting the prefix to key range.
On the other hand, no upper layer code of streaming executor is accessing StateStore
directly, and we don't have to worry about misuse from the streaming executor side. Instead, I think we should provide better flexibility in the StateStore
so that in the future when we want to support filtering other than the prefix filter in StateTable
, we can easily reuse the current StateStore
API without modifying it.
Currently we are doing two kinds of filtering in our code, min-max and bloom filter, and they are independent to each other, and therefore it is more flexible to pass the two filtering condition (key range and bloom filter hint) in separate parameters. Passing a prefix solely will introduce the assumption that, all tables that call this method are using prefix as the bloom filter key. However, for some tables, they do not use the prefix as the bloom filter key.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
However, for some tables, they do not use the prefix as the bloom filter key.
Can you give some specific examples?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Can you give some specific examples?
Did we previously have a plan to turn the prefix_hint
into bloom_filter_hint
in the ReadOptions
so that we are not using a key prefix as the bloom filter key? IIRC, the demand for this feature comes from the earlier design to support watermark, which puts the timestamp column at the beginning of key encoding to support state clean by range delete, but build the bloom filter with the following distribution key.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
For watermark issue, we finally decide to decouple bloom filter key and distribution key, and now bloom_filter_hint
/bloom_filter_key
is always pk_prefix
, and all streaming read will hit bloom filter.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
If we have decoupled bloom filter key and distribution key, I think we should pass the bloom filter key and the key range separately in the StateStore
API 🤔. The StateTable
will be responsible for calculating the bloom filter key according to the table config. Otherwise, they will be coupled again.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
In our current code, all upper layer streaming executors are accessing storage state via
StateTable
, and it already provides a clean API, which only passes a pk_prefix, and it is already doing the work of converting the prefix to key range.On the other hand, no upper layer code of streaming executor is accessing
StateStore
directly, and we don't have to worry about misuse from the streaming executor side. Instead, I think we should provide better flexibility in theStateStore
so that in the future when we want to support filtering other than the prefix filter inStateTable
, we can easily reuse the currentStateStore
API without modifying it.Currently we are doing two kinds of filtering in our code, min-max and bloom filter, and they are independent to each other, and therefore it is more flexible to pass the two filtering condition (key range and bloom filter hint) in separate parameters. Passing a prefix solely will introduce the assumption that, all tables that call this method are using prefix as the bloom filter key. However, for some tables, they do not use the prefix as the bloom filter key.
This is exactly the reason why I put key_range
in this new interface.
pub async fn surely_not_have(&self, pk_prefix: impl Row) -> StreamExecutorResult<bool> { | ||
let prefix_serializer = self.pk_serde.prefix(pk_prefix.len()); | ||
let encoded_prefix = serialize_pk(&pk_prefix, &prefix_serializer); | ||
let encoded_key_range = range_of_prefix(&encoded_prefix); |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
As I last comment, we only need to pass encoded_prefix
to state_store
so that we can avoid encode which cause another allocation and memory copy
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Maybe we can make the passed key range to be (Bound<Bytes>, Bound<Bytes>)
, and then turning a prefix into the key range will only cause shallow copy and no allocation and memcpy.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
The performance is not the main concern point. But I think pass prefix is directly and we only support to check whether data with this prefix exists in state-store. It is clear in for the method surely_not_have
.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
LGTM to state table part, and I'm curious about which executors will use this interface.
FYI, in materialize executor, we do pk conflict check to constraint input pk, but this will frequently accessed storage so we maintain a cache, not sure whether this interface will optimize it.
In hash join we can refill the operator cache without |
Can we also add metrics for |
There are three contributing factors to the cost of this function:
|
8c67631
to
85869dc
Compare
@Little-Wallace @KeXiangWang I have renamed the method to may_exist and added the duration metrics. PTAL. |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
There are still some surely_not_have
left. We'd better make sure all of them are updated before merging. LGTM for the rest.
|
||
self.state_store_metrics | ||
.iter_merge_sstable_counts | ||
.with_label_values(&[table_id_label, "surely-not-have"]) |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
rename to may_exist
1956181
to
123d9c0
Compare
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
LGTM
This PR adds the `may_exist` interface in both local state store and state table, which will be used by the streaming executor to check the existence of a given key range in storage efficiently. A typical use case is for hash join to check existence of a join key before refilling its operator cache on write (#7393 (comment)) Note that there can be false positive when using this interface but the false positive rate can be improved by providing a `prefix_hint` in ReadOptions to check SST bloom filters. In the current implementation, state table will always provide the `prefix_hint` in its `may_exist` implementation. LocalStateStore: ```rust /// Check existence of a given `key_range`. /// It is better to provide `prefix_hint` in `read_options`, which will be used /// for checking bloom filter if hummock is used. If `prefix_hint` is not provided, /// the false positive rate can be significantly higher because bloom filter cannot /// be used. /// /// Returns: /// - false: `key_range` is guaranteed to be absent in storage. /// - true: `key_range` may or may not exist in storage. fn may_exist( &self, key_range: (Bound<Vec<u8>>, Bound<Vec<u8>>), read_options: ReadOptions, ) -> Self::MayExistFuture<'_>; ``` StateTable: ```rust /// Returns: /// false: the provided pk prefix is absent in state store. /// true: the provided pk prefix may or may not be present in state store. pub async fn may_exist(&self, pk_prefix: impl Row) -> StreamExecutorResult<bool> { ``` Approved-By: Li0k Approved-By: wenym1 Approved-By: KeXiangWang Approved-By: wcy-fdu Approved-By: Little-Wallace
I hereby agree to the terms of the Singularity Data, Inc. Contributor License Agreement.
What's changed and what's your intention?
This PR adds the
may_exist
interface in both local state store and state table, which will be used by the streaming executor to check the existence of a given key range in storage efficiently. A typical use case is for hash join to check existence of a join key before refilling its operator cache on write (#7393 (comment))Note that there can be false positive when using this interface but the false positive rate can be improved by providing a
prefix_hint
in ReadOptions to check SST bloom filters. In the current implementation, state table will always provide theprefix_hint
in itsmay_exist
implementation.LocalStateStore:
StateTable:
Checklist
./risedev check
(or alias,./risedev c
)Documentation
If your pull request contains user-facing changes, please specify the types of the changes, and create a release note. Otherwise, please feel free to remove this section.
Types of user-facing changes
Please keep the types that apply to your changes, and remove those that do not apply.
Release note
Please create a release note for your changes. In the release note, focus on the impact on users, and mention the environment or conditions where the impact may occur.
Refer to a related PR or issue link (optional)