@@ -18,6 +18,7 @@ use std::mem::size_of;
18
18
use std:: ops:: Range ;
19
19
20
20
use bytes:: { Buf , BufMut , Bytes , BytesMut } ;
21
+ use risingwave_common:: catalog:: TableId ;
21
22
use risingwave_hummock_sdk:: key:: FullKey ;
22
23
use risingwave_hummock_sdk:: KeyComparator ;
23
24
use { lz4, zstd} ;
@@ -142,13 +143,18 @@ pub struct Block {
142
143
pub data : Bytes ,
143
144
/// Uncompressed entried data length.
144
145
data_len : usize ,
146
+
147
+ /// Table id of this block.
148
+ table_id : TableId ,
149
+
145
150
/// Restart points.
146
151
restart_points : Vec < RestartPoint > ,
147
152
}
148
153
149
154
impl Block {
150
155
pub fn decode ( buf : Bytes , uncompressed_capacity : usize ) -> HummockResult < Self > {
151
156
// Verify checksum.
157
+
152
158
let xxhash64_checksum = ( & buf[ buf. len ( ) - 8 ..] ) . get_u64_le ( ) ;
153
159
xxhash64_verify ( & buf[ ..buf. len ( ) - 8 ] , xxhash64_checksum) ?;
154
160
@@ -184,11 +190,12 @@ impl Block {
184
190
}
185
191
186
192
pub fn decode_from_raw ( buf : Bytes ) -> Self {
193
+ let table_id = ( & buf[ buf. len ( ) - 4 ..] ) . get_u32_le ( ) ;
187
194
// decode restart_points_type_index
188
- let n_index = ( ( & buf[ buf. len ( ) - 4 .. ] ) . get_u32_le ( ) ) as usize ;
195
+ let n_index = ( ( & buf[ buf. len ( ) - 8 ..buf . len ( ) - 4 ] ) . get_u32_le ( ) ) as usize ;
189
196
let index_data_len = size_of :: < u32 > ( ) + n_index * RestartPoint :: size_of ( ) ;
190
- let data_len = buf. len ( ) - index_data_len;
191
- let mut restart_points_type_index_buf = & buf[ data_len..buf. len ( ) - 4 ] ;
197
+ let data_len = buf. len ( ) - 4 - index_data_len;
198
+ let mut restart_points_type_index_buf = & buf[ data_len..buf. len ( ) - 8 ] ;
192
199
193
200
let mut index_key_vec = Vec :: with_capacity ( n_index) ;
194
201
for _ in 0 ..n_index {
@@ -213,6 +220,7 @@ impl Block {
213
220
let mut restart_points_buf = & buf[ data_len..restarts_end] ;
214
221
215
222
let mut type_index: usize = 0 ;
223
+
216
224
for _ in 0 ..n_restarts {
217
225
let offset = restart_points_buf. get_u32_le ( ) ;
218
226
if type_index < index_key_vec. len ( ) - 1
@@ -232,6 +240,7 @@ impl Block {
232
240
data : buf,
233
241
data_len,
234
242
restart_points,
243
+ table_id : TableId :: new ( table_id) ,
235
244
}
236
245
}
237
246
@@ -243,7 +252,13 @@ impl Block {
243
252
}
244
253
245
254
pub fn capacity ( & self ) -> usize {
246
- self . data . len ( ) + self . restart_points . capacity ( ) * std:: mem:: size_of :: < u32 > ( )
255
+ self . data . len ( )
256
+ + self . restart_points . capacity ( ) * std:: mem:: size_of :: < u32 > ( )
257
+ + std:: mem:: size_of :: < u32 > ( )
258
+ }
259
+
260
+ pub fn table_id ( & self ) -> TableId {
261
+ self . table_id
247
262
}
248
263
249
264
/// Gets restart point by index.
@@ -385,6 +400,7 @@ pub struct BlockBuilder {
385
400
/// Compression algorithm.
386
401
compression_algorithm : CompressionAlgorithm ,
387
402
403
+ table_id : Option < u32 > ,
388
404
// restart_points_type_index stores only the restart_point corresponding to each type change,
389
405
// as an index, in order to reduce space usage
390
406
restart_points_type_index : Vec < RestartPoint > ,
@@ -402,6 +418,7 @@ impl BlockBuilder {
402
418
last_key : vec ! [ ] ,
403
419
entry_count : 0 ,
404
420
compression_algorithm : options. compression_algorithm ,
421
+ table_id : None ,
405
422
restart_points_type_index : Vec :: default ( ) ,
406
423
}
407
424
}
@@ -420,15 +437,20 @@ impl BlockBuilder {
420
437
///
421
438
/// Panic if key is not added in ASCEND order.
422
439
pub fn add ( & mut self , full_key : FullKey < & [ u8 ] > , value : & [ u8 ] ) {
440
+ let input_table_id = full_key. user_key . table_id . table_id ( ) ;
441
+ match self . table_id {
442
+ Some ( current_table_id) => debug_assert_eq ! ( current_table_id, input_table_id) ,
443
+ None => self . table_id = Some ( input_table_id) ,
444
+ }
423
445
#[ cfg( debug_assertions) ]
424
446
self . debug_valid ( ) ;
425
447
426
448
let mut key: BytesMut = Default :: default ( ) ;
427
- full_key. encode_into ( & mut key) ;
449
+ full_key. encode_into_without_table_id ( & mut key) ;
428
450
if self . entry_count > 0 {
429
451
debug_assert ! ( !key. is_empty( ) ) ;
430
452
debug_assert_eq ! (
431
- KeyComparator :: compare_encoded_full_key( & self . last_key[ ..] , & key) ,
453
+ KeyComparator :: compare_encoded_full_key( & self . last_key[ ..] , & key[ .. ] ) ,
432
454
Ordering :: Less
433
455
) ;
434
456
}
@@ -462,7 +484,7 @@ impl BlockBuilder {
462
484
463
485
key. as_ref ( )
464
486
} else {
465
- bytes_diff_below_max_key_length ( & self . last_key , & key)
487
+ bytes_diff_below_max_key_length ( & self . last_key , & key[ .. ] )
466
488
} ;
467
489
468
490
let prefix = KeyPrefix :: new_without_len (
@@ -492,6 +514,7 @@ impl BlockBuilder {
492
514
pub fn clear ( & mut self ) {
493
515
self . buf . clear ( ) ;
494
516
self . restart_points . clear ( ) ;
517
+ self . table_id = None ;
495
518
self . restart_points_type_index . clear ( ) ;
496
519
self . last_key . clear ( ) ;
497
520
self . entry_count = 0 ;
@@ -504,6 +527,7 @@ impl BlockBuilder {
504
527
+ ( RestartPoint :: size_of ( ) ) // (offset + len_type(u8)) * len
505
528
* self . restart_points_type_index . len ( )
506
529
+ std:: mem:: size_of :: < u32 > ( ) // restart_points_type_index len
530
+ + std:: mem:: size_of :: < u32 > ( ) // table_id len
507
531
}
508
532
509
533
/// Finishes building block.
@@ -545,6 +569,7 @@ impl BlockBuilder {
545
569
self . buf
546
570
. put_u32_le ( self . restart_points_type_index . len ( ) as u32 ) ;
547
571
572
+ self . buf . put_u32_le ( self . table_id . unwrap ( ) ) ;
548
573
match self . compression_algorithm {
549
574
CompressionAlgorithm :: None => ( ) ,
550
575
CompressionAlgorithm :: Lz4 => {
@@ -581,6 +606,7 @@ impl BlockBuilder {
581
606
self . compression_algorithm . encode ( & mut self . buf ) ;
582
607
let checksum = xxhash64_checksum ( & self . buf ) ;
583
608
self . buf . put_u64_le ( checksum) ;
609
+
584
610
self . buf . as_ref ( )
585
611
}
586
612
@@ -595,6 +621,7 @@ impl BlockBuilder {
595
621
+ std:: mem:: size_of :: < u32 > ( ) // restart_points_type_indics.len
596
622
+ std:: mem:: size_of :: < CompressionAlgorithm > ( ) // compression_algorithm
597
623
+ std:: mem:: size_of :: < u64 > ( ) // checksum
624
+ + std:: mem:: size_of :: < u32 > ( ) // table_id
598
625
}
599
626
600
627
pub fn debug_valid ( & self ) {
0 commit comments