@@ -236,8 +236,8 @@ impl<K: HashKey + Send + Sync> HashAggExecutor<K> {
236
236
self . mem_context . add ( memory_usage_diff) ;
237
237
}
238
238
239
- // generate output data chunks
240
- let mut result = groups. into_iter ( ) ;
239
+ // Don't use `into_iter` here, it may cause memory leak.
240
+ let mut result = groups. iter_mut ( ) ;
241
241
let cardinality = self . chunk_size ;
242
242
loop {
243
243
let mut group_builders: Vec < _ > = self
@@ -259,9 +259,9 @@ impl<K: HashKey + Send + Sync> HashAggExecutor<K> {
259
259
array_len += 1 ;
260
260
key. deserialize_to_builders ( & mut group_builders[ ..] , & self . group_key_types ) ?;
261
261
states
262
- . into_iter ( )
262
+ . iter_mut ( )
263
263
. zip_eq_fast ( & mut agg_builders)
264
- . try_for_each ( |( mut aggregator, builder) | aggregator. output ( builder) ) ?;
264
+ . try_for_each ( |( aggregator, builder) | aggregator. output ( builder) ) ?;
265
265
}
266
266
if !has_next {
267
267
break ; // exit loop
@@ -281,6 +281,11 @@ impl<K: HashKey + Send + Sync> HashAggExecutor<K> {
281
281
282
282
#[ cfg( test) ]
283
283
mod tests {
284
+ use std:: alloc:: { AllocError , Allocator , Global , Layout } ;
285
+ use std:: ptr:: NonNull ;
286
+ use std:: sync:: atomic:: { AtomicBool , Ordering } ;
287
+ use std:: sync:: Arc ;
288
+
284
289
use prometheus:: IntGauge ;
285
290
use risingwave_common:: catalog:: { Field , Schema } ;
286
291
use risingwave_common:: test_prelude:: DataChunkTestExt ;
@@ -296,9 +301,11 @@ mod tests {
296
301
297
302
#[ tokio:: test]
298
303
async fn execute_int32_grouped ( ) {
299
- let src_exec = Box :: new ( MockExecutor :: with_chunk (
300
- DataChunk :: from_pretty (
301
- "i i i
304
+ let parent_mem = MemoryContext :: root ( IntGauge :: new ( "root_memory_usage" , " " ) . unwrap ( ) ) ;
305
+ {
306
+ let src_exec = Box :: new ( MockExecutor :: with_chunk (
307
+ DataChunk :: from_pretty (
308
+ "i i i
302
309
0 1 1
303
310
1 1 1
304
311
0 0 1
@@ -307,68 +314,75 @@ mod tests {
307
314
0 0 2
308
315
1 1 3
309
316
0 1 2" ,
310
- ) ,
311
- Schema :: new ( vec ! [
312
- Field :: unnamed( DataType :: Int32 ) ,
313
- Field :: unnamed( DataType :: Int32 ) ,
314
- Field :: unnamed( DataType :: Int64 ) ,
315
- ] ) ,
316
- ) ) ;
317
-
318
- let agg_call = AggCall {
319
- r#type : Type :: Sum as i32 ,
320
- args : vec ! [ InputRef {
321
- index: 2 ,
322
- r#type: Some ( PbDataType {
323
- type_name: TypeName :: Int32 as i32 ,
317
+ ) ,
318
+ Schema :: new ( vec ! [
319
+ Field :: unnamed( DataType :: Int32 ) ,
320
+ Field :: unnamed( DataType :: Int32 ) ,
321
+ Field :: unnamed( DataType :: Int64 ) ,
322
+ ] ) ,
323
+ ) ) ;
324
+
325
+ let agg_call = AggCall {
326
+ r#type : Type :: Sum as i32 ,
327
+ args : vec ! [ InputRef {
328
+ index: 2 ,
329
+ r#type: Some ( PbDataType {
330
+ type_name: TypeName :: Int32 as i32 ,
331
+ ..Default :: default ( )
332
+ } ) ,
333
+ } ] ,
334
+ return_type : Some ( PbDataType {
335
+ type_name : TypeName :: Int64 as i32 ,
324
336
..Default :: default ( )
325
337
} ) ,
326
- } ] ,
327
- return_type : Some ( PbDataType {
328
- type_name : TypeName :: Int64 as i32 ,
329
- ..Default :: default ( )
330
- } ) ,
331
- distinct : false ,
332
- order_by : vec ! [ ] ,
333
- filter : None ,
334
- direct_args : vec ! [ ] ,
335
- } ;
336
-
337
- let agg_prost = HashAggNode {
338
- group_key : vec ! [ 0 , 1 ] ,
339
- agg_calls : vec ! [ agg_call] ,
340
- } ;
341
-
342
- let mem_context = MemoryContext :: root ( IntGauge :: new ( "memory_usage" , " " ) . unwrap ( ) ) ;
343
- let actual_exec = HashAggExecutorBuilder :: deserialize (
344
- & agg_prost,
345
- src_exec,
346
- TaskId :: default ( ) ,
347
- "HashAggExecutor" . to_string ( ) ,
348
- CHUNK_SIZE ,
349
- mem_context. clone ( ) ,
350
- )
351
- . unwrap ( ) ;
352
-
353
- // TODO: currently the order is fixed unless the hasher is changed
354
- let expect_exec = Box :: new ( MockExecutor :: with_chunk (
355
- DataChunk :: from_pretty (
356
- "i i I
338
+ distinct : false ,
339
+ order_by : vec ! [ ] ,
340
+ filter : None ,
341
+ direct_args : vec ! [ ] ,
342
+ } ;
343
+
344
+ let agg_prost = HashAggNode {
345
+ group_key : vec ! [ 0 , 1 ] ,
346
+ agg_calls : vec ! [ agg_call] ,
347
+ } ;
348
+
349
+ let mem_context = MemoryContext :: new (
350
+ Some ( parent_mem. clone ( ) ) ,
351
+ IntGauge :: new ( "memory_usage" , " " ) . unwrap ( ) ,
352
+ ) ;
353
+ let actual_exec = HashAggExecutorBuilder :: deserialize (
354
+ & agg_prost,
355
+ src_exec,
356
+ TaskId :: default ( ) ,
357
+ "HashAggExecutor" . to_string ( ) ,
358
+ CHUNK_SIZE ,
359
+ mem_context. clone ( ) ,
360
+ )
361
+ . unwrap ( ) ;
362
+
363
+ // TODO: currently the order is fixed unless the hasher is changed
364
+ let expect_exec = Box :: new ( MockExecutor :: with_chunk (
365
+ DataChunk :: from_pretty (
366
+ "i i I
357
367
1 0 1
358
368
0 0 3
359
369
0 1 3
360
370
1 1 6" ,
361
- ) ,
362
- Schema :: new ( vec ! [
363
- Field :: unnamed( DataType :: Int32 ) ,
364
- Field :: unnamed( DataType :: Int32 ) ,
365
- Field :: unnamed( DataType :: Int64 ) ,
366
- ] ) ,
367
- ) ) ;
368
- diff_executor_output ( actual_exec, expect_exec) . await ;
369
-
370
- // check estimated memory usage = 4 groups x state size
371
- assert_eq ! ( mem_context. get_bytes_used( ) as usize , 4 * 72 ) ;
371
+ ) ,
372
+ Schema :: new ( vec ! [
373
+ Field :: unnamed( DataType :: Int32 ) ,
374
+ Field :: unnamed( DataType :: Int32 ) ,
375
+ Field :: unnamed( DataType :: Int64 ) ,
376
+ ] ) ,
377
+ ) ) ;
378
+ diff_executor_output ( actual_exec, expect_exec) . await ;
379
+
380
+ // check estimated memory usage = 4 groups x state size
381
+ assert_eq ! ( mem_context. get_bytes_used( ) as usize , 4 * 72 ) ;
382
+ }
383
+
384
+ // Ensure that agg memory counter has been dropped.
385
+ assert_eq ! ( 0 , parent_mem. get_bytes_used( ) ) ;
372
386
}
373
387
374
388
#[ tokio:: test]
@@ -425,4 +439,61 @@ mod tests {
425
439
) ;
426
440
diff_executor_output ( actual_exec, Box :: new ( expect_exec) ) . await ;
427
441
}
442
+
443
+ /// A test to verify that `HashMap` may leak memory counter when using `into_iter`.
444
+ #[ test]
445
+ fn test_hashmap_into_iter_bug ( ) {
446
+ let dropped: Arc < AtomicBool > = Arc :: new ( AtomicBool :: new ( false ) ) ;
447
+
448
+ {
449
+ struct MyAllocInner {
450
+ drop_flag : Arc < AtomicBool > ,
451
+ }
452
+
453
+ #[ derive( Clone ) ]
454
+ struct MyAlloc {
455
+ inner : Arc < MyAllocInner > ,
456
+ }
457
+
458
+ impl Drop for MyAllocInner {
459
+ fn drop ( & mut self ) {
460
+ println ! ( "MyAlloc freed." ) ;
461
+ self . drop_flag . store ( true , Ordering :: SeqCst ) ;
462
+ }
463
+ }
464
+
465
+ unsafe impl Allocator for MyAlloc {
466
+ fn allocate (
467
+ & self ,
468
+ layout : Layout ,
469
+ ) -> std:: result:: Result < NonNull < [ u8 ] > , AllocError > {
470
+ let g = Global ;
471
+ g. allocate ( layout)
472
+ }
473
+
474
+ unsafe fn deallocate ( & self , ptr : NonNull < u8 > , layout : Layout ) {
475
+ let g = Global ;
476
+ g. deallocate ( ptr, layout)
477
+ }
478
+ }
479
+
480
+ let mut map = hashbrown:: HashMap :: with_capacity_in (
481
+ 10 ,
482
+ MyAlloc {
483
+ inner : Arc :: new ( MyAllocInner {
484
+ drop_flag : dropped. clone ( ) ,
485
+ } ) ,
486
+ } ,
487
+ ) ;
488
+ for i in 0 ..10 {
489
+ map. entry ( i) . or_insert_with ( || "i" . to_string ( ) ) ;
490
+ }
491
+
492
+ for ( k, v) in map {
493
+ println ! ( "{}, {}" , k, v) ;
494
+ }
495
+ }
496
+
497
+ assert ! ( !dropped. load( Ordering :: SeqCst ) ) ;
498
+ }
428
499
}
0 commit comments