@@ -393,59 +393,36 @@ private static InternalAggregations aggregate(
393
393
if (resultSetSize == 0 ) {
394
394
return null ;
395
395
}
396
- if ( resultSetSize == 1 ) {
397
- if (partialResults .hasNext ()) {
398
- return InternalAggregations . reduce ( partialResults .next (), reduceContext );
399
- }
396
+ final InternalAggregations first ;
397
+ if (partialResults .hasNext ()) {
398
+ first = partialResults .next ();
399
+ } else {
400
400
try (var delayable = toConsume .next ().consumeAggs ()) {
401
- return InternalAggregations . reduce ( delayable .expand (), reduceContext );
401
+ first = delayable .expand ();
402
402
}
403
403
}
404
+ if (resultSetSize == 1 ) {
405
+ return InternalAggregations .reduce (first , reduceContext );
406
+ }
404
407
try {
405
408
// general case
406
- if (partialResults .hasNext ()) {
407
- return consumeAggResults (partialResults , toConsume , createReducer (resultSetSize , reduceContext , partialResults .next ()));
408
- }
409
- AggregatorsReducer reducer ;
410
- try (var delayable = toConsume .next ().consumeAggs ()) {
411
- reducer = createReducer (resultSetSize , reduceContext , delayable .expand ());
409
+ try (var reducer = new AggregatorsReducer (first , reduceContext , resultSetSize )) {
410
+ reducer .accept (first );
411
+ partialResults .forEachRemaining (reducer ::accept );
412
+ while (toConsume .hasNext ()) {
413
+ final InternalAggregations next ;
414
+ try (var delayable = toConsume .next ().consumeAggs ()) {
415
+ next = delayable .expand ();
416
+ }
417
+ reducer .accept (next );
418
+ }
419
+ return reducer .get ();
412
420
}
413
- return consumeAggResults (partialResults , toConsume , reducer );
414
421
} finally {
415
422
toConsume .forEachRemaining (QuerySearchResult ::releaseAggs );
416
423
}
417
424
}
418
425
419
- private static AggregatorsReducer createReducer (int resultSetSize , AggregationReduceContext reduceContext , InternalAggregations first ) {
420
- boolean success = false ;
421
- var reducer = new AggregatorsReducer (first , reduceContext , resultSetSize );
422
- try {
423
- reducer .accept (first );
424
- success = true ;
425
- return reducer ;
426
- } finally {
427
- if (success == false ) {
428
- reducer .close ();
429
- }
430
- }
431
- }
432
-
433
- private static InternalAggregations consumeAggResults (
434
- Iterator <InternalAggregations > partialResults ,
435
- Iterator <QuerySearchResult > toConsume ,
436
- AggregatorsReducer reducer
437
- ) {
438
- try (reducer ) {
439
- partialResults .forEachRemaining (reducer ::accept );
440
- while (toConsume .hasNext ()) {
441
- try (var delayable = toConsume .next ().consumeAggs ()) {
442
- reducer .accept (delayable .expand ());
443
- }
444
- }
445
- return reducer .get ();
446
- }
447
- }
448
-
449
426
public int getNumReducePhases () {
450
427
return numReducePhases ;
451
428
}
0 commit comments