Skip to content

Commit e48d828

Browse files
Release aggregations earlier during reduce (elastic#124520)
Release each hit's aggregations before moving on to the next hit and unlink it from the shard result even earlier. Also, do the aggregation-reduce earlier in the reduce steps to reduce average heap use over time. To that effect, do not do the reduction in the search phase controller. This has the added benefit of removing any need for a fake aggs-reduce-context in scroll.
1 parent fb5a57e commit e48d828

File tree

6 files changed

+116
-108
lines changed

6 files changed

+116
-108
lines changed

server/src/main/java/org/elasticsearch/action/search/QueryPhaseResultConsumer.java

Lines changed: 66 additions & 31 deletions
Original file line numberDiff line numberDiff line change
@@ -15,9 +15,12 @@
1515
import org.elasticsearch.action.search.SearchPhaseController.TopDocsStats;
1616
import org.elasticsearch.common.breaker.CircuitBreaker;
1717
import org.elasticsearch.common.breaker.CircuitBreakingException;
18+
import org.elasticsearch.common.collect.Iterators;
1819
import org.elasticsearch.common.io.stream.DelayableWriteable;
1920
import org.elasticsearch.common.lucene.search.TopDocsAndMaxScore;
2021
import org.elasticsearch.common.util.concurrent.AbstractRunnable;
22+
import org.elasticsearch.core.Releasable;
23+
import org.elasticsearch.core.Releasables;
2124
import org.elasticsearch.search.SearchPhaseResult;
2225
import org.elasticsearch.search.SearchService;
2326
import org.elasticsearch.search.SearchShardTarget;
@@ -31,6 +34,7 @@
3134
import java.util.ArrayList;
3235
import java.util.Collections;
3336
import java.util.Comparator;
37+
import java.util.Iterator;
3438
import java.util.List;
3539
import java.util.concurrent.Executor;
3640
import java.util.concurrent.atomic.AtomicReference;
@@ -174,14 +178,10 @@ public SearchPhaseController.ReducedQueryPhase reduce() throws Exception {
174178
this.mergeResult = null;
175179
final int resultSize = buffer.size() + (mergeResult == null ? 0 : 1);
176180
final List<TopDocs> topDocsList = hasTopDocs ? new ArrayList<>(resultSize) : null;
177-
final List<DelayableWriteable<InternalAggregations>> aggsList = hasAggs ? new ArrayList<>(resultSize) : null;
178181
if (mergeResult != null) {
179182
if (topDocsList != null) {
180183
topDocsList.add(mergeResult.reducedTopDocs);
181184
}
182-
if (aggsList != null) {
183-
aggsList.add(DelayableWriteable.referencing(mergeResult.reducedAggs));
184-
}
185185
}
186186
for (QuerySearchResult result : buffer) {
187187
topDocsStats.add(result.topDocs(), result.searchTimedOut(), result.terminatedEarly());
@@ -190,34 +190,39 @@ public SearchPhaseController.ReducedQueryPhase reduce() throws Exception {
190190
setShardIndex(topDocs.topDocs, result.getShardIndex());
191191
topDocsList.add(topDocs.topDocs);
192192
}
193-
if (aggsList != null) {
194-
aggsList.add(result.getAggs());
195-
}
196193
}
197194
SearchPhaseController.ReducedQueryPhase reducePhase;
198195
long breakerSize = circuitBreakerBytes;
196+
final InternalAggregations aggs;
199197
try {
200-
if (aggsList != null) {
198+
if (hasAggs) {
201199
// Add an estimate of the final reduce size
202200
breakerSize = addEstimateAndMaybeBreak(estimateRamBytesUsedForReduce(breakerSize));
201+
aggs = aggregate(
202+
buffer.iterator(),
203+
mergeResult,
204+
resultSize,
205+
performFinalReduce ? aggReduceContextBuilder.forFinalReduction() : aggReduceContextBuilder.forPartialReduction()
206+
);
207+
} else {
208+
aggs = null;
203209
}
204210
reducePhase = SearchPhaseController.reducedQueryPhase(
205211
results.asList(),
206-
aggsList,
212+
aggs,
207213
topDocsList == null ? Collections.emptyList() : topDocsList,
208214
topDocsStats,
209215
numReducePhases,
210216
false,
211-
aggReduceContextBuilder,
212-
queryPhaseRankCoordinatorContext,
213-
performFinalReduce
217+
queryPhaseRankCoordinatorContext
214218
);
219+
buffer = null;
215220
} finally {
216221
releaseAggs(buffer);
217222
}
218223
if (hasAggs
219224
// reduced aggregations can be null if all shards failed
220-
&& reducePhase.aggregations() != null) {
225+
&& aggs != null) {
221226

222227
// Update the circuit breaker to replace the estimation with the serialized size of the newly reduced result
223228
long finalSize = DelayableWriteable.getSerializedSize(reducePhase.aggregations()) - breakerSize;
@@ -249,17 +254,7 @@ private MergeResult partialReduce(
249254
toConsume.sort(RESULT_COMPARATOR);
250255

251256
final TopDocs newTopDocs;
252-
final InternalAggregations newAggs;
253-
final List<DelayableWriteable<InternalAggregations>> aggsList;
254257
final int resultSetSize = toConsume.size() + (lastMerge != null ? 1 : 0);
255-
if (hasAggs) {
256-
aggsList = new ArrayList<>(resultSetSize);
257-
if (lastMerge != null) {
258-
aggsList.add(DelayableWriteable.referencing(lastMerge.reducedAggs));
259-
}
260-
} else {
261-
aggsList = null;
262-
}
263258
List<TopDocs> topDocsList;
264259
if (hasTopDocs) {
265260
topDocsList = new ArrayList<>(resultSetSize);
@@ -269,14 +264,12 @@ private MergeResult partialReduce(
269264
} else {
270265
topDocsList = null;
271266
}
267+
final InternalAggregations newAggs;
272268
try {
273269
for (QuerySearchResult result : toConsume) {
274270
topDocsStats.add(result.topDocs(), result.searchTimedOut(), result.terminatedEarly());
275271
SearchShardTarget target = result.getSearchShardTarget();
276272
processedShards.add(new SearchShard(target.getClusterAlias(), target.getShardId()));
277-
if (aggsList != null) {
278-
aggsList.add(result.getAggs());
279-
}
280273
if (topDocsList != null) {
281274
TopDocsAndMaxScore topDocs = result.consumeTopDocs();
282275
setShardIndex(topDocs.topDocs, result.getShardIndex());
@@ -285,9 +278,10 @@ private MergeResult partialReduce(
285278
}
286279
// we have to merge here in the same way we collect on a shard
287280
newTopDocs = topDocsList == null ? null : mergeTopDocs(topDocsList, topNSize, 0);
288-
newAggs = aggsList == null
289-
? null
290-
: InternalAggregations.topLevelReduceDelayable(aggsList, aggReduceContextBuilder.forPartialReduction());
281+
newAggs = hasAggs
282+
? aggregate(toConsume.iterator(), lastMerge, resultSetSize, aggReduceContextBuilder.forPartialReduction())
283+
: null;
284+
toConsume = null;
291285
} finally {
292286
releaseAggs(toConsume);
293287
}
@@ -302,6 +296,45 @@ private MergeResult partialReduce(
302296
return new MergeResult(processedShards, newTopDocs, newAggs, newAggs != null ? DelayableWriteable.getSerializedSize(newAggs) : 0);
303297
}
304298

299+
private static InternalAggregations aggregate(
300+
Iterator<QuerySearchResult> toConsume,
301+
MergeResult lastMerge,
302+
int resultSetSize,
303+
AggregationReduceContext reduceContext
304+
) {
305+
interface ReleasableIterator extends Iterator<InternalAggregations>, Releasable {}
306+
try (var aggsIter = new ReleasableIterator() {
307+
308+
private Releasable toRelease;
309+
310+
@Override
311+
public void close() {
312+
Releasables.close(toRelease);
313+
}
314+
315+
@Override
316+
public boolean hasNext() {
317+
return toConsume.hasNext();
318+
}
319+
320+
@Override
321+
public InternalAggregations next() {
322+
var res = toConsume.next().consumeAggs();
323+
Releasables.close(toRelease);
324+
toRelease = res;
325+
return res.expand();
326+
}
327+
}) {
328+
return InternalAggregations.topLevelReduce(
329+
lastMerge == null ? aggsIter : Iterators.concat(Iterators.single(lastMerge.reducedAggs), aggsIter),
330+
resultSetSize,
331+
reduceContext
332+
);
333+
} finally {
334+
toConsume.forEachRemaining(QuerySearchResult::releaseAggs);
335+
}
336+
}
337+
305338
public int getNumReducePhases() {
306339
return numReducePhases;
307340
}
@@ -517,8 +550,10 @@ public void onFailure(Exception exc) {
517550
}
518551

519552
private static void releaseAggs(List<QuerySearchResult> toConsume) {
520-
for (QuerySearchResult result : toConsume) {
521-
result.releaseAggs();
553+
if (toConsume != null) {
554+
for (QuerySearchResult result : toConsume) {
555+
result.releaseAggs();
556+
}
522557
}
523558
}
524559

server/src/main/java/org/elasticsearch/action/search/SearchPhaseController.java

Lines changed: 5 additions & 12 deletions
Original file line numberDiff line numberDiff line change
@@ -20,11 +20,11 @@
2020
import org.apache.lucene.search.TotalHits;
2121
import org.apache.lucene.search.TotalHits.Relation;
2222
import org.elasticsearch.common.breaker.CircuitBreaker;
23-
import org.elasticsearch.common.io.stream.DelayableWriteable;
2423
import org.elasticsearch.common.lucene.Lucene;
2524
import org.elasticsearch.common.lucene.search.TopDocsAndMaxScore;
2625
import org.elasticsearch.common.util.Maps;
2726
import org.elasticsearch.common.util.concurrent.AtomicArray;
27+
import org.elasticsearch.core.Nullable;
2828
import org.elasticsearch.index.fielddata.IndexFieldData;
2929
import org.elasticsearch.lucene.grouping.TopFieldGroups;
3030
import org.elasticsearch.search.DocValueFormat;
@@ -402,22 +402,20 @@ private static SearchHits getHits(
402402
/**
403403
* Reduces the given query results and consumes all aggregations and profile results.
404404
* @param queryResults a list of non-null query shard results
405-
* @param bufferedAggs a list of pre-collected aggregations.
405+
* @param reducedAggs already reduced aggregations
406406
* @param bufferedTopDocs a list of pre-collected top docs.
407407
* @param numReducePhases the number of non-final reduce phases applied to the query results.
408408
* @see QuerySearchResult#getAggs()
409409
* @see QuerySearchResult#consumeProfileResult()
410410
*/
411411
static ReducedQueryPhase reducedQueryPhase(
412412
Collection<? extends SearchPhaseResult> queryResults,
413-
List<DelayableWriteable<InternalAggregations>> bufferedAggs,
413+
@Nullable InternalAggregations reducedAggs,
414414
List<TopDocs> bufferedTopDocs,
415415
TopDocsStats topDocsStats,
416416
int numReducePhases,
417417
boolean isScrollRequest,
418-
AggregationReduceContext.Builder aggReduceContextBuilder,
419-
QueryPhaseRankCoordinatorContext queryPhaseRankCoordinatorContext,
420-
boolean performFinalReduce
418+
QueryPhaseRankCoordinatorContext queryPhaseRankCoordinatorContext
421419
) {
422420
assert numReducePhases >= 0 : "num reduce phases must be >= 0 but was: " + numReducePhases;
423421
numReducePhases++; // increment for this phase
@@ -521,12 +519,7 @@ static ReducedQueryPhase reducedQueryPhase(
521519
topDocsStats.timedOut,
522520
topDocsStats.terminatedEarly,
523521
reducedSuggest,
524-
bufferedAggs == null
525-
? null
526-
: InternalAggregations.topLevelReduceDelayable(
527-
bufferedAggs,
528-
performFinalReduce ? aggReduceContextBuilder.forFinalReduction() : aggReduceContextBuilder.forPartialReduction()
529-
),
522+
reducedAggs,
530523
profileShardResults.isEmpty() ? null : new SearchProfileResultsBuilder(profileShardResults),
531524
sortedTopDocs,
532525
sortValueFormats,

server/src/main/java/org/elasticsearch/action/search/SearchScrollAsyncAction.java

Lines changed: 1 addition & 23 deletions
Original file line numberDiff line numberDiff line change
@@ -20,7 +20,6 @@
2020
import org.elasticsearch.core.Nullable;
2121
import org.elasticsearch.search.SearchPhaseResult;
2222
import org.elasticsearch.search.SearchShardTarget;
23-
import org.elasticsearch.search.aggregations.AggregationReduceContext;
2423
import org.elasticsearch.search.internal.InternalScrollSearchRequest;
2524
import org.elasticsearch.search.internal.SearchContext;
2625
import org.elasticsearch.search.internal.ShardSearchContextId;
@@ -313,17 +312,6 @@ protected Transport.Connection getConnection(String clusterAlias, DiscoveryNode
313312
* @param queryResults a list of non-null query shard results
314313
*/
315314
protected static SearchPhaseController.ReducedQueryPhase reducedScrollQueryPhase(Collection<? extends SearchPhaseResult> queryResults) {
316-
AggregationReduceContext.Builder aggReduceContextBuilder = new AggregationReduceContext.Builder() {
317-
@Override
318-
public AggregationReduceContext forPartialReduction() {
319-
throw new UnsupportedOperationException("Scroll requests don't have aggs");
320-
}
321-
322-
@Override
323-
public AggregationReduceContext forFinalReduction() {
324-
throw new UnsupportedOperationException("Scroll requests don't have aggs");
325-
}
326-
};
327315
final SearchPhaseController.TopDocsStats topDocsStats = new SearchPhaseController.TopDocsStats(
328316
SearchContext.TRACK_TOTAL_HITS_ACCURATE
329317
);
@@ -339,16 +327,6 @@ public AggregationReduceContext forFinalReduction() {
339327
topDocs.add(td.topDocs);
340328
}
341329
}
342-
return SearchPhaseController.reducedQueryPhase(
343-
queryResults,
344-
null,
345-
topDocs,
346-
topDocsStats,
347-
0,
348-
true,
349-
aggReduceContextBuilder,
350-
null,
351-
true
352-
);
330+
return SearchPhaseController.reducedQueryPhase(queryResults, null, topDocs, topDocsStats, 0, true, null);
353331
}
354332
}

server/src/main/java/org/elasticsearch/search/aggregations/InternalAggregations.java

Lines changed: 31 additions & 33 deletions
Original file line numberDiff line numberDiff line change
@@ -10,7 +10,6 @@
1010

1111
import org.apache.lucene.util.SetOnce;
1212
import org.elasticsearch.common.ParsingException;
13-
import org.elasticsearch.common.io.stream.DelayableWriteable;
1413
import org.elasticsearch.common.io.stream.StreamInput;
1514
import org.elasticsearch.common.io.stream.StreamOutput;
1615
import org.elasticsearch.common.io.stream.Writeable;
@@ -26,7 +25,6 @@
2625
import org.elasticsearch.xcontent.XContentParser;
2726

2827
import java.io.IOException;
29-
import java.util.AbstractList;
3028
import java.util.ArrayList;
3129
import java.util.Iterator;
3230
import java.util.List;
@@ -206,44 +204,22 @@ public SortValue sortValue(AggregationPath.PathElement head, Iterator<Aggregatio
206204
}
207205

208206
/**
209-
* Equivalent to {@link #topLevelReduce(List, AggregationReduceContext)} but it takes a list of
210-
* {@link DelayableWriteable}. The object will be expanded once via {@link DelayableWriteable#expand()}
211-
* but it is the responsibility of the caller to release those releasables.
207+
* Equivalent to {@link #topLevelReduce(List, AggregationReduceContext)} but it takes an iterator and a count.
212208
*/
213-
public static InternalAggregations topLevelReduceDelayable(
214-
List<DelayableWriteable<InternalAggregations>> delayableAggregations,
215-
AggregationReduceContext context
216-
) {
217-
final List<InternalAggregations> aggregations = new AbstractList<>() {
218-
@Override
219-
public InternalAggregations get(int index) {
220-
return delayableAggregations.get(index).expand();
221-
}
222-
223-
@Override
224-
public int size() {
225-
return delayableAggregations.size();
226-
}
227-
};
228-
return topLevelReduce(aggregations, context);
209+
public static InternalAggregations topLevelReduce(Iterator<InternalAggregations> aggs, int count, AggregationReduceContext context) {
210+
if (count == 0) {
211+
return null;
212+
}
213+
return maybeExecuteFinalReduce(context, count == 1 ? reduce(aggs.next(), context) : reduce(aggs, count, context));
229214
}
230215

231-
/**
232-
* Begin the reduction process. This should be the entry point for the "first" reduction, e.g. called by
233-
* SearchPhaseController or anywhere else that wants to initiate a reduction. It _should not_ be called
234-
* as an intermediate reduction step (e.g. in the middle of an aggregation tree).
235-
*
236-
* This method first reduces the aggregations, and if it is the final reduce, then reduce the pipeline
237-
* aggregations (both embedded parent/sibling as well as top-level sibling pipelines)
238-
*/
239-
public static InternalAggregations topLevelReduce(List<InternalAggregations> aggregationsList, AggregationReduceContext context) {
240-
InternalAggregations reduced = reduce(aggregationsList, context);
216+
private static InternalAggregations maybeExecuteFinalReduce(AggregationReduceContext context, InternalAggregations reduced) {
241217
if (reduced == null) {
242218
return null;
243219
}
244220
if (context.isFinalReduce()) {
245-
List<InternalAggregation> reducedInternalAggs = reduced.getInternalAggregations();
246-
reducedInternalAggs = reducedInternalAggs.stream()
221+
List<InternalAggregation> reducedInternalAggs = reduced.getInternalAggregations()
222+
.stream()
247223
.map(agg -> agg.reducePipelines(agg, context, context.pipelineTreeRoot().subTree(agg.getName())))
248224
.collect(Collectors.toCollection(ArrayList::new));
249225

@@ -257,6 +233,18 @@ public static InternalAggregations topLevelReduce(List<InternalAggregations> agg
257233
return reduced;
258234
}
259235

236+
/**
237+
* Begin the reduction process. This should be the entry point for the "first" reduction, e.g. called by
238+
* SearchPhaseController or anywhere else that wants to initiate a reduction. It _should not_ be called
239+
* as an intermediate reduction step (e.g. in the middle of an aggregation tree).
240+
*
241+
* This method first reduces the aggregations, and if it is the final reduce, then reduce the pipeline
242+
* aggregations (both embedded parent/sibling as well as top-level sibling pipelines)
243+
*/
244+
public static InternalAggregations topLevelReduce(List<InternalAggregations> aggregationsList, AggregationReduceContext context) {
245+
return maybeExecuteFinalReduce(context, reduce(aggregationsList, context));
246+
}
247+
260248
/**
261249
* Reduces the given list of aggregations as well as the top-level pipeline aggregators extracted from the first
262250
* {@link InternalAggregations} object found in the list.
@@ -280,6 +268,16 @@ public static InternalAggregations reduce(List<InternalAggregations> aggregation
280268
}
281269
}
282270

271+
private static InternalAggregations reduce(Iterator<InternalAggregations> aggsIterator, int count, AggregationReduceContext context) {
272+
// general case
273+
var first = aggsIterator.next();
274+
try (AggregatorsReducer reducer = new AggregatorsReducer(first, context, count)) {
275+
reducer.accept(first);
276+
aggsIterator.forEachRemaining(reducer::accept);
277+
return reducer.get();
278+
}
279+
}
280+
283281
public static InternalAggregations reduce(InternalAggregations aggregations, AggregationReduceContext context) {
284282
final List<InternalAggregation> internalAggregations = aggregations.asList();
285283
int size = internalAggregations.size();

0 commit comments

Comments
 (0)