@@ -14,7 +14,9 @@ use re_data_store::{
14
14
} ;
15
15
use re_log_types:: { EntityPath , RowId , StoreId , TimeInt , Timeline } ;
16
16
use re_query:: ArchetypeView ;
17
- use re_types_core:: { components:: InstanceKey , Archetype , ArchetypeName , Component , ComponentName } ;
17
+ use re_types_core:: {
18
+ components:: InstanceKey , Archetype , ArchetypeName , Component , ComponentName , SizeBytes as _,
19
+ } ;
18
20
19
21
use crate :: { ErasedFlatVecDeque , FlatVecDeque } ;
20
22
@@ -49,10 +51,8 @@ static CACHES: Lazy<StoreSubscriberHandle> =
49
51
Lazy :: new ( || re_data_store:: DataStore :: register_subscriber ( Box :: < Caches > :: default ( ) ) ) ;
50
52
51
53
/// Maintains the top-level cache mappings.
52
- //
53
- // TODO(#4730): SizeBytes support + size stats + mem panel
54
54
#[ derive( Default ) ]
55
- pub struct Caches ( RwLock < HashMap < CacheKey , CachesPerArchetype > > ) ;
55
+ pub struct Caches ( pub ( crate ) RwLock < HashMap < CacheKey , CachesPerArchetype > > ) ;
56
56
57
57
#[ derive( Default ) ]
58
58
pub struct CachesPerArchetype {
@@ -65,7 +65,7 @@ pub struct CachesPerArchetype {
65
65
//
66
66
// TODO(cmc): At some point we should probably just store the PoV and optional components rather
67
67
// than an `ArchetypeName`: the query system doesn't care about archetypes.
68
- latest_at_per_archetype : RwLock < HashMap < ArchetypeName , Arc < RwLock < LatestAtCache > > > > ,
68
+ pub ( crate ) latest_at_per_archetype : RwLock < HashMap < ArchetypeName , Arc < RwLock < LatestAtCache > > > > ,
69
69
70
70
/// Everything greater than or equal to this timestamp has been asynchronously invalidated.
71
71
///
@@ -116,8 +116,8 @@ impl Caches {
116
116
re_data_store:: DataStore :: with_subscriber_once ( * CACHES , move |caches : & Caches | {
117
117
let mut caches = caches. 0 . write ( ) ;
118
118
119
- let caches_per_archetype = caches. entry ( key) . or_default ( ) ;
120
- caches_per_archetype. handle_pending_invalidation ( ) ;
119
+ let caches_per_archetype = caches. entry ( key. clone ( ) ) . or_default ( ) ;
120
+ caches_per_archetype. handle_pending_invalidation ( & key ) ;
121
121
122
122
let mut latest_at_per_archetype =
123
123
caches_per_archetype. latest_at_per_archetype . write ( ) ;
@@ -133,6 +133,12 @@ impl Caches {
133
133
let mut cache = cache. write ( ) ;
134
134
f ( & mut cache)
135
135
}
136
+
137
+ #[ inline]
138
+ pub ( crate ) fn with < F : FnMut ( & Caches ) -> R , R > ( f : F ) -> R {
139
+ // NOTE: downcasting cannot fail, this is our own private handle.
140
+ re_data_store:: DataStore :: with_subscriber ( * CACHES , f) . unwrap ( )
141
+ }
136
142
}
137
143
138
144
/// Uniquely identifies cached query results in the [`Caches`].
@@ -264,7 +270,7 @@ impl CachesPerArchetype {
264
270
///
265
271
/// Invalidation is deferred to query time because it is far more efficient that way: the frame
266
272
/// time effectively behaves as a natural micro-batching mechanism.
267
- fn handle_pending_invalidation ( & mut self ) {
273
+ fn handle_pending_invalidation ( & mut self , key : & CacheKey ) {
268
274
let pending_timeless_invalidation = self . pending_timeless_invalidation ;
269
275
let pending_timeful_invalidation = self . pending_timeful_invalidation . is_some ( ) ;
270
276
@@ -281,15 +287,39 @@ impl CachesPerArchetype {
281
287
latest_at_cache. timeless = None ;
282
288
}
283
289
290
+ let mut removed_bytes = 0u64 ;
284
291
if let Some ( min_time) = self . pending_timeful_invalidation {
285
292
latest_at_cache
286
293
. per_query_time
287
294
. retain ( |& query_time, _| query_time < min_time) ;
288
295
289
- latest_at_cache
290
- . per_data_time
291
- . retain ( |& data_time, _| data_time < min_time) ;
296
+ latest_at_cache. per_data_time . retain ( |& data_time, bucket| {
297
+ if data_time < min_time {
298
+ return true ;
299
+ }
300
+
301
+ // Only if that bucket is about to be dropped.
302
+ if Arc :: strong_count ( bucket) == 1 {
303
+ removed_bytes += bucket. read ( ) . total_size_bytes ;
304
+ }
305
+
306
+ false
307
+ } ) ;
292
308
}
309
+
310
+ latest_at_cache. total_size_bytes = latest_at_cache
311
+ . total_size_bytes
312
+ . checked_sub ( removed_bytes)
313
+ . unwrap_or_else ( || {
314
+ re_log:: debug!(
315
+ store_id = %key. store_id,
316
+ entity_path = %key. entity_path,
317
+ current = latest_at_cache. total_size_bytes,
318
+ removed = removed_bytes,
319
+ "book keeping underflowed"
320
+ ) ;
321
+ u64:: MIN
322
+ } ) ;
293
323
}
294
324
295
325
self . pending_timeful_invalidation = None ;
@@ -328,9 +358,14 @@ pub struct CacheBucket {
328
358
// TODO(#4733): Don't denormalize auto-generated instance keys.
329
359
// TODO(#4734): Don't denormalize splatted values.
330
360
pub ( crate ) components : BTreeMap < ComponentName , Box < dyn ErasedFlatVecDeque + Send + Sync > > ,
361
+
362
+ /// The total size in bytes stored in this bucket.
363
+ ///
364
+ /// Only used so we can decrement the global cache size when the last reference to a bucket
365
+ /// gets dropped.
366
+ pub ( crate ) total_size_bytes : u64 ,
331
367
//
332
368
// TODO(cmc): secondary cache
333
- // TODO(#4730): size stats: this requires codegen'ing SizeBytes for all components!
334
369
}
335
370
336
371
impl CacheBucket {
@@ -387,12 +422,14 @@ macro_rules! impl_insert {
387
422
#[ doc = "Inserts the contents of the given [`ArchetypeView`], which are made of the specified" ]
388
423
#[ doc = "`" $N "` point-of-view components and `" $M "` optional components, to the cache." ]
389
424
#[ doc = "" ]
425
+ #[ doc = "Returns the size in bytes of the data that was cached." ]
426
+ #[ doc = "" ]
390
427
#[ doc = "`query_time` must be the time of query, _not_ of the resulting data." ]
391
428
pub fn [ <insert_pov$N _comp$M>] <A , $( $pov, ) + $( $comp) ,* >(
392
429
& mut self ,
393
430
query_time: TimeInt ,
394
431
arch_view: & ArchetypeView <A >,
395
- ) -> :: re_query:: Result <( ) >
432
+ ) -> :: re_query:: Result <u64 >
396
433
where
397
434
A : Archetype ,
398
435
$( $pov: Component + Send + Sync + ' static , ) +
@@ -401,21 +438,31 @@ macro_rules! impl_insert {
401
438
// NOTE: not `profile_function!` because we want them merged together.
402
439
re_tracing:: profile_scope!( "CacheBucket::insert" , format!( "arch={} pov={} comp={}" , A :: name( ) , $N, $M) ) ;
403
440
404
- let Self {
405
- data_times,
406
- pov_instance_keys,
407
- components: _,
408
- } = self ;
409
-
410
441
let pov_row_id = arch_view. primary_row_id( ) ;
411
- let index = data_times. partition_point( |t| t < & ( query_time, pov_row_id) ) ;
442
+ let index = self . data_times. partition_point( |t| t < & ( query_time, pov_row_id) ) ;
443
+
444
+ let mut added_size_bytes = 0u64 ;
445
+
446
+ self . data_times. insert( index, ( query_time, pov_row_id) ) ;
447
+ added_size_bytes += ( query_time, pov_row_id) . total_size_bytes( ) ;
448
+
449
+ {
450
+ // The `FlatVecDeque` will have to collect the data one way or another: do it ourselves
451
+ // instead, that way we can efficiently computes its size while we're at it.
452
+ let added: FlatVecDeque <InstanceKey > = arch_view
453
+ . iter_instance_keys( )
454
+ . collect:: <VecDeque <InstanceKey >>( )
455
+ . into( ) ;
456
+ added_size_bytes += added. total_size_bytes( ) ;
457
+ self . pov_instance_keys. insert_deque( index, added) ;
458
+ }
459
+
460
+ $( added_size_bytes += self . insert_component:: <A , $pov>( index, arch_view) ?; ) +
461
+ $( added_size_bytes += self . insert_component_opt:: <A , $comp>( index, arch_view) ?; ) *
412
462
413
- data_times. insert( index, ( query_time, pov_row_id) ) ;
414
- pov_instance_keys. insert( index, arch_view. iter_instance_keys( ) ) ;
415
- $( self . insert_component:: <A , $pov>( index, arch_view) ?; ) +
416
- $( self . insert_component_opt:: <A , $comp>( index, arch_view) ?; ) *
463
+ self . total_size_bytes += added_size_bytes;
417
464
418
- Ok ( ( ) )
465
+ Ok ( added_size_bytes )
419
466
} }
420
467
} ;
421
468
@@ -436,7 +483,7 @@ impl CacheBucket {
436
483
& mut self ,
437
484
query_time : TimeInt ,
438
485
arch_view : & ArchetypeView < A > ,
439
- ) -> :: re_query:: Result < ( ) >
486
+ ) -> :: re_query:: Result < u64 >
440
487
where
441
488
A : Archetype ,
442
489
R1 : Component + Send + Sync + ' static ,
@@ -453,42 +500,58 @@ impl CacheBucket {
453
500
& mut self ,
454
501
at : usize ,
455
502
arch_view : & ArchetypeView < A > ,
456
- ) -> re_query:: Result < ( ) > {
503
+ ) -> re_query:: Result < u64 > {
457
504
re_tracing:: profile_function!( C :: name( ) ) ;
458
505
459
506
let data = self
460
507
. components
461
508
. entry ( C :: name ( ) )
462
509
. or_insert_with ( || Box :: new ( FlatVecDeque :: < C > :: new ( ) ) ) ;
463
510
511
+ // The `FlatVecDeque` will have to collect the data one way or another: do it ourselves
512
+ // instead, that way we can efficiently computes its size while we're at it.
513
+ let added: FlatVecDeque < C > = arch_view
514
+ . iter_required_component :: < C > ( ) ?
515
+ . collect :: < VecDeque < C > > ( )
516
+ . into ( ) ;
517
+ let added_size_bytes = added. total_size_bytes ( ) ;
518
+
464
519
// NOTE: downcast cannot fail, we create it just above.
465
520
let data = data. as_any_mut ( ) . downcast_mut :: < FlatVecDeque < C > > ( ) . unwrap ( ) ;
466
- data. insert ( at, arch_view . iter_required_component :: < C > ( ) ? ) ;
521
+ data. insert_deque ( at, added ) ;
467
522
468
- Ok ( ( ) )
523
+ Ok ( added_size_bytes )
469
524
}
470
525
471
526
#[ inline]
472
527
fn insert_component_opt < A : Archetype , C : Component + Send + Sync + ' static > (
473
528
& mut self ,
474
529
at : usize ,
475
530
arch_view : & ArchetypeView < A > ,
476
- ) -> re_query:: Result < ( ) > {
531
+ ) -> re_query:: Result < u64 > {
477
532
re_tracing:: profile_function!( C :: name( ) ) ;
478
533
479
534
let data = self
480
535
. components
481
536
. entry ( C :: name ( ) )
482
537
. or_insert_with ( || Box :: new ( FlatVecDeque :: < Option < C > > :: new ( ) ) ) ;
483
538
539
+ // The `FlatVecDeque` will have to collect the data one way or another: do it ourselves
540
+ // instead, that way we can efficiently computes its size while we're at it.
541
+ let added: FlatVecDeque < Option < C > > = arch_view
542
+ . iter_optional_component :: < C > ( ) ?
543
+ . collect :: < VecDeque < Option < C > > > ( )
544
+ . into ( ) ;
545
+ let added_size_bytes = added. total_size_bytes ( ) ;
546
+
484
547
// NOTE: downcast cannot fail, we create it just above.
485
548
let data = data
486
549
. as_any_mut ( )
487
550
. downcast_mut :: < FlatVecDeque < Option < C > > > ( )
488
551
. unwrap ( ) ;
489
- data. insert ( at, arch_view . iter_optional_component :: < C > ( ) ? ) ;
552
+ data. insert_deque ( at, added ) ;
490
553
491
- Ok ( ( ) )
554
+ Ok ( added_size_bytes )
492
555
}
493
556
}
494
557
@@ -526,4 +589,7 @@ pub struct LatestAtCache {
526
589
// NOTE: Lives separately so we don't pay the extra `Option` cost in the much more common
527
590
// timeful case.
528
591
pub timeless : Option < CacheBucket > ,
592
+
593
+ /// Total size of the data stored in this cache in bytes.
594
+ pub total_size_bytes : u64 ,
529
595
}
0 commit comments