@@ -83,6 +83,7 @@ pub struct SstableBuilderOutput<WO> {
83
83
pub writer_output : WO ,
84
84
pub avg_key_size : usize ,
85
85
pub avg_value_size : usize ,
86
+ pub epoch_count : usize ,
86
87
}
87
88
88
89
pub struct SstableBuilder < W : SstableWriter , F : FilterBuilder > {
@@ -119,8 +120,7 @@ pub struct SstableBuilder<W: SstableWriter, F: FilterBuilder> {
119
120
120
121
filter_builder : F ,
121
122
122
- min_epoch : u64 ,
123
- max_epoch : u64 ,
123
+ epoch_set : BTreeSet < u64 > ,
124
124
}
125
125
126
126
impl < W : SstableWriter > SstableBuilder < W , XorFilterBuilder > {
@@ -168,8 +168,7 @@ impl<W: SstableWriter, F: FilterBuilder> SstableBuilder<W, F> {
168
168
total_key_count : 0 ,
169
169
table_stats : Default :: default ( ) ,
170
170
last_table_stats : Default :: default ( ) ,
171
- min_epoch : u64:: MAX ,
172
- max_epoch : u64:: MIN ,
171
+ epoch_set : BTreeSet :: default ( ) ,
173
172
}
174
173
}
175
174
@@ -238,6 +237,8 @@ impl<W: SstableWriter, F: FilterBuilder> SstableBuilder<W, F> {
238
237
self . total_key_count += 1 ;
239
238
self . last_table_stats . total_key_count += 1 ;
240
239
240
+ self . epoch_set . insert ( full_key. epoch ) ;
241
+
241
242
if is_new_table && !self . block_builder . is_empty ( ) {
242
243
self . build_block ( ) . await ?;
243
244
}
@@ -263,9 +264,6 @@ impl<W: SstableWriter, F: FilterBuilder> SstableBuilder<W, F> {
263
264
self . raw_key . clear ( ) ;
264
265
self . raw_value . clear ( ) ;
265
266
266
- self . min_epoch = cmp:: min ( self . min_epoch , full_key. epoch ) ;
267
- self . max_epoch = cmp:: max ( self . max_epoch , full_key. epoch ) ;
268
-
269
267
if self . block_builder . approximate_len ( ) >= self . options . block_capacity {
270
268
self . build_block ( ) . await ?;
271
269
}
@@ -363,6 +361,48 @@ impl<W: SstableWriter, F: FilterBuilder> SstableBuilder<W, F> {
363
361
( tombstone_min_epoch, tombstone_max_epoch)
364
362
} ;
365
363
364
+ let ( avg_key_size, avg_value_size) = if self . table_stats . is_empty ( ) {
365
+ ( 0 , 0 )
366
+ } else {
367
+ let total_key_count: usize = self
368
+ . table_stats
369
+ . values ( )
370
+ . map ( |s| s. total_key_count as usize )
371
+ . sum ( ) ;
372
+
373
+ if total_key_count == 0 {
374
+ ( 0 , 0 )
375
+ } else {
376
+ let total_key_size: usize = self
377
+ . table_stats
378
+ . values ( )
379
+ . map ( |s| s. total_key_size as usize )
380
+ . sum ( ) ;
381
+
382
+ let total_value_size: usize = self
383
+ . table_stats
384
+ . values ( )
385
+ . map ( |s| s. total_value_size as usize )
386
+ . sum ( ) ;
387
+
388
+ (
389
+ total_key_size / total_key_count,
390
+ total_value_size / total_key_count,
391
+ )
392
+ }
393
+ } ;
394
+
395
+ let ( min_epoch, max_epoch) = {
396
+ if self . epoch_set . is_empty ( ) {
397
+ ( u64:: MAX , u64:: MIN )
398
+ } else {
399
+ (
400
+ * self . epoch_set . first ( ) . unwrap ( ) ,
401
+ * self . epoch_set . last ( ) . unwrap ( ) ,
402
+ )
403
+ }
404
+ } ;
405
+
366
406
let sst_info = SstableInfo {
367
407
id : self . sstable_id ,
368
408
key_range : Some ( risingwave_pb:: hummock:: KeyRange {
@@ -377,43 +417,29 @@ impl<W: SstableWriter, F: FilterBuilder> SstableBuilder<W, F> {
377
417
total_key_count : self . total_key_count ,
378
418
divide_version : 0 ,
379
419
uncompressed_file_size : uncompressed_file_size + meta. encoded_size ( ) as u64 ,
380
- min_epoch : cmp:: min ( self . min_epoch , tombstone_min_epoch) ,
381
- max_epoch : cmp:: max ( self . max_epoch , tombstone_max_epoch) ,
420
+ min_epoch : cmp:: min ( min_epoch, tombstone_min_epoch) ,
421
+ max_epoch : cmp:: max ( max_epoch, tombstone_max_epoch) ,
382
422
} ;
383
423
tracing:: trace!(
384
- "meta_size {} bloom_filter_size {} add_key_counts {} stale_key_count {} min_epoch {} max_epoch {}" ,
424
+ "meta_size {} bloom_filter_size {} add_key_counts {} stale_key_count {} min_epoch {} max_epoch {} epoch_count {} " ,
385
425
meta. encoded_size( ) ,
386
426
meta. bloom_filter. len( ) ,
387
427
self . total_key_count,
388
428
self . stale_key_count,
389
- self . min_epoch,
390
- self . max_epoch,
429
+ min_epoch,
430
+ max_epoch,
431
+ self . epoch_set. len( )
391
432
) ;
392
433
let bloom_filter_size = meta. bloom_filter . len ( ) ;
393
- let ( avg_key_size, avg_value_size) = if self . table_stats . is_empty ( ) {
394
- ( 0 , 0 )
395
- } else {
396
- let avg_key_size = self
397
- . table_stats
398
- . values ( )
399
- . map ( |s| s. total_key_size as usize )
400
- . sum :: < usize > ( )
401
- / self . table_stats . len ( ) ;
402
- let avg_value_size = self
403
- . table_stats
404
- . values ( )
405
- . map ( |s| s. total_value_size as usize )
406
- . sum :: < usize > ( )
407
- / self . table_stats . len ( ) ;
408
- ( avg_key_size, avg_value_size)
409
- } ;
434
+
410
435
let writer_output = self . writer . finish ( meta) . await ?;
411
436
Ok ( SstableBuilderOutput :: < W :: Output > {
412
437
sst_info : LocalSstableInfo :: with_stats ( sst_info, self . table_stats ) ,
413
438
bloom_filter_size,
414
439
writer_output,
415
440
avg_key_size,
416
441
avg_value_size,
442
+ epoch_count : self . epoch_set . len ( ) ,
417
443
} )
418
444
}
419
445
0 commit comments