@@ -41,14 +41,14 @@ use super::{
41
41
use crate :: cache:: { cache_may_stale, new_with_hasher, ExecutorCache } ;
42
42
use crate :: common:: table:: state_table:: StateTable ;
43
43
use crate :: error:: StreamResult ;
44
- use crate :: executor:: aggregation:: { generate_agg_schema, AggCall , AggGroup } ;
44
+ use crate :: executor:: aggregation:: { generate_agg_schema, AggCall , AggGroup as GenericAggGroup } ;
45
45
use crate :: executor:: error:: StreamExecutorError ;
46
46
use crate :: executor:: monitor:: StreamingMetrics ;
47
47
use crate :: executor:: { BoxedMessageStream , Message } ;
48
48
use crate :: task:: AtomicU64Ref ;
49
49
50
- type BoxedAggGroup < S > = Box < AggGroup < S , OnlyOutputIfHasInput > > ;
51
- type AggGroupCache < K , S > = ExecutorCache < K , BoxedAggGroup < S > , PrecomputedBuildHasher > ;
50
+ type AggGroup < S > = GenericAggGroup < S , OnlyOutputIfHasInput > ;
51
+ type AggGroupCache < K , S > = ExecutorCache < K , AggGroup < S > , PrecomputedBuildHasher > ;
52
52
53
53
/// [`HashAggExecutor`] could process large amounts of data using a state backend. It works as
54
54
/// follows:
@@ -255,19 +255,17 @@ impl<K: HashKey, S: StateStore> HashAggExecutor<K, S> {
255
255
Some ( async {
256
256
// Create `AggGroup` for the current group if not exists. This will
257
257
// fetch previous agg result from the result table.
258
- let agg_group = Box :: new (
259
- AggGroup :: create (
260
- Some ( key. deserialize ( group_key_types) ?) ,
261
- & this. agg_calls ,
262
- & this. storages ,
263
- & this. result_table ,
264
- & this. input_pk_indices ,
265
- this. row_count_index ,
266
- this. extreme_cache_size ,
267
- & this. input_schema ,
268
- )
269
- . await ?,
270
- ) ;
258
+ let agg_group = AggGroup :: create (
259
+ Some ( key. deserialize ( group_key_types) ?) ,
260
+ & this. agg_calls ,
261
+ & this. storages ,
262
+ & this. result_table ,
263
+ & this. input_pk_indices ,
264
+ this. row_count_index ,
265
+ this. extreme_cache_size ,
266
+ & this. input_schema ,
267
+ )
268
+ . await ?;
271
269
Ok :: < _ , StreamExecutorError > ( ( key. clone ( ) , agg_group) )
272
270
} )
273
271
}
@@ -345,7 +343,7 @@ impl<K: HashKey, S: StateStore> HashAggExecutor<K, S> {
345
343
346
344
// Apply chunk to each of the state (per agg_call), for each group.
347
345
for ( key, visibility) in group_visibilities {
348
- let agg_group = vars. agg_group_cache . get_mut ( & key) . unwrap ( ) . as_mut ( ) ;
346
+ let agg_group = vars. agg_group_cache . get_mut ( & key) . unwrap ( ) ;
349
347
let visibilities = call_visibilities
350
348
. iter ( )
351
349
. map ( Option :: as_ref)
@@ -421,8 +419,7 @@ impl<K: HashKey, S: StateStore> HashAggExecutor<K, S> {
421
419
let agg_group = vars
422
420
. agg_group_cache
423
421
. get_mut ( key)
424
- . expect ( "changed group must have corresponding AggGroup" )
425
- . as_mut ( ) ;
422
+ . expect ( "changed group must have corresponding AggGroup" ) ;
426
423
agg_group. flush_state_if_needed ( & mut this. storages ) . await ?;
427
424
}
428
425
0 commit comments