|
15 | 15 | import org.elasticsearch.action.search.SearchPhaseController.TopDocsStats;
|
16 | 16 | import org.elasticsearch.common.breaker.CircuitBreaker;
|
17 | 17 | import org.elasticsearch.common.breaker.CircuitBreakingException;
|
| 18 | +import org.elasticsearch.common.collect.Iterators; |
18 | 19 | import org.elasticsearch.common.io.stream.DelayableWriteable;
|
19 | 20 | import org.elasticsearch.common.lucene.search.TopDocsAndMaxScore;
|
20 | 21 | import org.elasticsearch.common.util.concurrent.AbstractRunnable;
|
@@ -306,36 +307,29 @@ interface ReleasableIterator extends Iterator<InternalAggregations>, Releasable
|
306 | 307 |
|
307 | 308 | private Releasable toRelease;
|
308 | 309 |
|
309 |
| - private boolean onFirst; |
310 |
| - |
311 | 310 | @Override
|
312 | 311 | public void close() {
|
313 | 312 | Releasables.close(toRelease);
|
314 | 313 | }
|
315 | 314 |
|
316 | 315 | @Override
|
317 | 316 | public boolean hasNext() {
|
318 |
| - if (onFirst && lastMerge != null) { |
319 |
| - return true; |
320 |
| - } |
321 | 317 | return toConsume.hasNext();
|
322 | 318 | }
|
323 | 319 |
|
324 | 320 | @Override
|
325 | 321 | public InternalAggregations next() {
|
326 |
| - if (onFirst) { |
327 |
| - onFirst = false; |
328 |
| - if (lastMerge != null) { |
329 |
| - return lastMerge.reducedAggs; |
330 |
| - } |
331 |
| - } |
332 | 322 | var res = toConsume.next().consumeAggs();
|
333 | 323 | Releasables.close(toRelease);
|
334 | 324 | toRelease = res;
|
335 | 325 | return res.expand();
|
336 | 326 | }
|
337 | 327 | }) {
|
338 |
| - return InternalAggregations.topLevelReduce(aggsIter, resultSetSize, reduceContext); |
| 328 | + return InternalAggregations.topLevelReduce( |
| 329 | + lastMerge == null ? aggsIter : Iterators.concat(Iterators.single(lastMerge.reducedAggs), aggsIter), |
| 330 | + resultSetSize, |
| 331 | + reduceContext |
| 332 | + ); |
339 | 333 | } finally {
|
340 | 334 | toConsume.forEachRemaining(QuerySearchResult::releaseAggs);
|
341 | 335 | }
|
|
0 commit comments