Skip to content

Commit be9723e

Browse files
fix(state clean): state clean should not delete NULL (risingwavelabs#8737)
1 parent 4b03a93 commit be9723e

File tree

1 file changed

+11
-5
lines changed

1 file changed

+11
-5
lines changed

src/stream/src/common/table/state_table.rs

Lines changed: 11 additions & 5 deletions
Original file line numberDiff line numberDiff line change
@@ -808,22 +808,28 @@ where
808808
} else {
809809
Some(self.pk_serde.prefix(1))
810810
};
811-
let range_end_suffix = watermark.map(|watermark| {
811+
let watermark_suffix = watermark.map(|watermark| {
812812
serialize_pk(
813813
row::once(Some(watermark)),
814814
prefix_serializer.as_ref().unwrap(),
815815
)
816816
});
817-
if let Some(range_end_suffix) = range_end_suffix {
818-
let range_begin_suffix = vec![];
819-
trace!(table_id = %self.table_id, range_end = ?range_end_suffix, vnodes = ?{
817+
if let Some(watermark_suffix) = watermark_suffix {
818+
// We either serialize null into `0u8`, data into `(1u8 || scalar)`, or serialize null
819+
// into `1u8`, data into `(0u8 || scalar)`. We do not want to delete null
820+
// here, so `range_begin_suffix` cannot be `vec![]` when null is represented as `0u8`.
821+
let range_begin_suffix = watermark_suffix
822+
.first()
823+
.map(|bit| vec![*bit])
824+
.unwrap_or_default();
825+
trace!(table_id = %self.table_id, watermark = ?watermark_suffix, vnodes = ?{
820826
self.vnodes.iter_vnodes().collect_vec()
821827
}, "delete range");
822828
for vnode in self.vnodes.iter_vnodes() {
823829
let mut range_begin = vnode.to_be_bytes().to_vec();
824830
let mut range_end = range_begin.clone();
825831
range_begin.extend(&range_begin_suffix);
826-
range_end.extend(&range_end_suffix);
832+
range_end.extend(&watermark_suffix);
827833
delete_ranges.push((Bytes::from(range_begin), Bytes::from(range_end)));
828834
}
829835
}

0 commit comments

Comments
 (0)