14
14
15
15
use std:: collections:: { HashMap , HashSet } ;
16
16
use std:: marker:: PhantomData ;
17
+ use std:: ptr:: NonNull ;
17
18
use std:: sync:: Arc ;
18
19
19
20
use futures:: { stream, StreamExt , TryStreamExt } ;
@@ -436,11 +437,17 @@ impl<K: HashKey, S: StateStore> HashAggExecutor<K, S> {
436
437
437
438
// Calculate current outputs, concurrently.
438
439
let futs = keys_in_batch. into_iter ( ) . map ( |key| {
439
- // Pop out the agg group temporarily.
440
- let mut agg_group = vars
441
- . agg_group_cache
442
- . pop ( & key)
443
- . expect ( "changed group must have corresponding AggGroup" ) ;
440
+ // Get agg group of the key.
441
+ let agg_group = {
442
+ let mut ptr: NonNull < _ > = vars
443
+ . agg_group_cache
444
+ . get_mut ( & key)
445
+ . expect ( "changed group must have corresponding AggGroup" )
446
+ . into ( ) ;
447
+ // SAFETY: `key`s in `keys_in_batch` are unique by nature, because they're
448
+ // from `group_change_set` which is a set.
449
+ unsafe { ptr. as_mut ( ) }
450
+ } ;
444
451
async {
445
452
let curr_outputs = agg_group. get_outputs ( & this. storages ) . await ?;
446
453
Ok :: < _ , StreamExecutorError > ( ( key, agg_group, curr_outputs) )
@@ -452,7 +459,7 @@ impl<K: HashKey, S: StateStore> HashAggExecutor<K, S> {
452
459
. try_collect ( )
453
460
. await ?;
454
461
455
- for ( key, mut agg_group, curr_outputs) in outputs_in_batch {
462
+ for ( key, agg_group, curr_outputs) in outputs_in_batch {
456
463
let AggChangesInfo {
457
464
n_appended_ops,
458
465
result_row,
@@ -477,9 +484,6 @@ impl<K: HashKey, S: StateStore> HashAggExecutor<K, S> {
477
484
this. result_table . insert ( result_row) ;
478
485
}
479
486
}
480
-
481
- // Put the agg group back into the agg group cache.
482
- vars. agg_group_cache . put ( key, agg_group) ;
483
487
}
484
488
485
489
let columns = builders
0 commit comments