Skip to content

Commit e885aa9

Browse files
authored
Latency improvements to Multi Term Aggregations (#14993)
* Avoid deep copy and other allocation improvements * Refactoring based on PR Comments and added JavaDocs * Added more comments * Added character for Triggering Jenkins build * Changes to cover collectZeroDocEntries method * Updated comment based on change in method's functionality * Added test to cover branches in collectZeroDocEntriesIfRequired * Rebased and resolved changelog conflict --------- Signed-off-by: expani <[email protected]>
1 parent 146b0f7 commit e885aa9

File tree

3 files changed

+116
-51
lines changed

3 files changed

+116
-51
lines changed

CHANGELOG.md

+1
Original file line numberDiff line numberDiff line change
@@ -19,6 +19,7 @@ The format is based on [Keep a Changelog](https://keepachangelog.com/en/1.0.0/),
1919
- Add changes to block calls in cat shards, indices and segments based on dynamic limit settings ([#15986](https://github.com/opensearch-project/OpenSearch/pull/15986))
2020
- New `phone` & `phone-search` analyzer + tokenizer ([#15915](https://github.com/opensearch-project/OpenSearch/pull/15915))
2121
- Add _list/shards API as paginated alternate to _cat/shards ([#14641](https://github.com/opensearch-project/OpenSearch/pull/14641))
22+
- Latency and Memory allocation improvements to Multi Term Aggregation queries ([#14993](https://github.com/opensearch-project/OpenSearch/pull/14993))
2223

2324
### Dependencies
2425
- Bump `com.azure:azure-identity` from 1.13.0 to 1.13.2 ([#15578](https://github.com/opensearch-project/OpenSearch/pull/15578))

server/src/main/java/org/opensearch/search/aggregations/bucket/terms/MultiTermsAggregator.java

+59-37
Original file line numberDiff line numberDiff line change
@@ -33,6 +33,7 @@
3333
import org.opensearch.search.aggregations.InternalAggregation;
3434
import org.opensearch.search.aggregations.InternalOrder;
3535
import org.opensearch.search.aggregations.LeafBucketCollector;
36+
import org.opensearch.search.aggregations.bucket.BucketsAggregator;
3637
import org.opensearch.search.aggregations.bucket.DeferableBucketAggregator;
3738
import org.opensearch.search.aggregations.bucket.LocalBucketCountThresholds;
3839
import org.opensearch.search.aggregations.support.AggregationPath;
@@ -215,19 +216,11 @@ public InternalAggregation buildEmptyAggregation() {
215216

216217
@Override
217218
protected LeafBucketCollector getLeafCollector(LeafReaderContext ctx, LeafBucketCollector sub) throws IOException {
218-
MultiTermsValuesSourceCollector collector = multiTermsValue.getValues(ctx);
219+
MultiTermsValuesSourceCollector collector = multiTermsValue.getValues(ctx, bucketOrds, this, sub);
219220
return new LeafBucketCollector() {
220221
@Override
221222
public void collect(int doc, long owningBucketOrd) throws IOException {
222-
for (BytesRef compositeKey : collector.apply(doc)) {
223-
long bucketOrd = bucketOrds.add(owningBucketOrd, compositeKey);
224-
if (bucketOrd < 0) {
225-
bucketOrd = -1 - bucketOrd;
226-
collectExistingBucket(sub, doc, bucketOrd);
227-
} else {
228-
collectBucket(sub, doc, bucketOrd);
229-
}
230-
}
223+
collector.apply(doc, owningBucketOrd);
231224
}
232225
};
233226
}
@@ -268,12 +261,10 @@ private void collectZeroDocEntriesIfNeeded(long owningBucketOrd) throws IOExcept
268261
}
269262
// we need to fill-in the blanks
270263
for (LeafReaderContext ctx : context.searcher().getTopReaderContext().leaves()) {
271-
MultiTermsValuesSourceCollector collector = multiTermsValue.getValues(ctx);
272264
// brute force
265+
MultiTermsValuesSourceCollector collector = multiTermsValue.getValues(ctx, bucketOrds, null, null);
273266
for (int docId = 0; docId < ctx.reader().maxDoc(); ++docId) {
274-
for (BytesRef compositeKey : collector.apply(docId)) {
275-
bucketOrds.add(owningBucketOrd, compositeKey);
276-
}
267+
collector.apply(docId, owningBucketOrd);
277268
}
278269
}
279270
}
@@ -284,10 +275,11 @@ private void collectZeroDocEntriesIfNeeded(long owningBucketOrd) throws IOExcept
284275
@FunctionalInterface
285276
interface MultiTermsValuesSourceCollector {
286277
/**
287-
* Collect a list values of multi_terms on each doc.
288-
* Each terms could have multi_values, so the result is the cartesian product of each term's values.
278+
* Generates the cartesian product of all fields used in aggregation and
279+
* collects them in buckets using the composite key of their field values.
289280
*/
290-
List<BytesRef> apply(int doc) throws IOException;
281+
void apply(int doc, long owningBucketOrd) throws IOException;
282+
291283
}
292284

293285
@FunctionalInterface
@@ -361,47 +353,72 @@ public MultiTermsValuesSource(List<InternalValuesSource> valuesSources) {
361353
this.valuesSources = valuesSources;
362354
}
363355

364-
public MultiTermsValuesSourceCollector getValues(LeafReaderContext ctx) throws IOException {
356+
public MultiTermsValuesSourceCollector getValues(
357+
LeafReaderContext ctx,
358+
BytesKeyedBucketOrds bucketOrds,
359+
BucketsAggregator aggregator,
360+
LeafBucketCollector sub
361+
) throws IOException {
365362
List<InternalValuesSourceCollector> collectors = new ArrayList<>();
366363
for (InternalValuesSource valuesSource : valuesSources) {
367364
collectors.add(valuesSource.apply(ctx));
368365
}
366+
boolean collectBucketOrds = aggregator != null && sub != null;
369367
return new MultiTermsValuesSourceCollector() {
368+
369+
/**
370+
* This method does the following : <br>
371+
* <li>Fetches the values of every field present in the doc List<List<TermValue<?>>> via @{@link InternalValuesSourceCollector}</li>
372+
* <li>Generates Composite keys from the fetched values for all fields present in the aggregation.</li>
373+
* <li>Adds every composite key to the @{@link BytesKeyedBucketOrds} and Optionally collects them via @{@link BucketsAggregator#collectBucket(LeafBucketCollector, int, long)}</li>
374+
*/
370375
@Override
371-
public List<BytesRef> apply(int doc) throws IOException {
376+
public void apply(int doc, long owningBucketOrd) throws IOException {
377+
// TODO A new list creation can be avoided for every doc.
372378
List<List<TermValue<?>>> collectedValues = new ArrayList<>();
373379
for (InternalValuesSourceCollector collector : collectors) {
374380
collectedValues.add(collector.apply(doc));
375381
}
376-
List<BytesRef> result = new ArrayList<>();
377382
scratch.seek(0);
378383
scratch.writeVInt(collectors.size()); // number of fields per composite key
379-
cartesianProduct(result, scratch, collectedValues, 0);
380-
return result;
384+
generateAndCollectCompositeKeys(collectedValues, 0, owningBucketOrd, doc);
381385
}
382386

383387
/**
384-
* Cartesian product using depth first search.
385-
*
386-
* <p>
387-
* Composite keys are encoded to a {@link BytesRef} in a format compatible with {@link StreamOutput::writeGenericValue},
388-
* but reuses the encoding of the shared prefixes from the previous levels to avoid wasteful work.
388+
* This generates and collects all Composite keys in their buckets by performing a cartesian product <br>
389+
* of all the values in all the fields ( used in agg ) for the given doc recursively.
390+
* @param collectedValues : Values of all fields present in the aggregation for the @doc
391+
* @param index : Points to the field being added to generate the composite key
389392
*/
390-
private void cartesianProduct(
391-
List<BytesRef> compositeKeys,
392-
BytesStreamOutput scratch,
393+
private void generateAndCollectCompositeKeys(
393394
List<List<TermValue<?>>> collectedValues,
394-
int index
395+
int index,
396+
long owningBucketOrd,
397+
int doc
395398
) throws IOException {
396399
if (collectedValues.size() == index) {
397-
compositeKeys.add(BytesRef.deepCopyOf(scratch.bytes().toBytesRef()));
400+
// Avoid performing a deep copy of the composite key by inlining.
401+
long bucketOrd = bucketOrds.add(owningBucketOrd, scratch.bytes().toBytesRef());
402+
if (collectBucketOrds) {
403+
if (bucketOrd < 0) {
404+
bucketOrd = -1 - bucketOrd;
405+
aggregator.collectExistingBucket(sub, doc, bucketOrd);
406+
} else {
407+
aggregator.collectBucket(sub, doc, bucketOrd);
408+
}
409+
}
398410
return;
399411
}
400412

401413
long position = scratch.position();
402-
for (TermValue<?> value : collectedValues.get(index)) {
414+
List<TermValue<?>> values = collectedValues.get(index);
415+
int numIterations = values.size();
416+
// For each loop is not done to reduce the allocations done for Iterator objects
417+
// once for every field in every doc.
418+
for (int i = 0; i < numIterations; i++) {
419+
TermValue<?> value = values.get(i);
403420
value.writeTo(scratch); // encode the value
404-
cartesianProduct(compositeKeys, scratch, collectedValues, index + 1); // dfs
421+
generateAndCollectCompositeKeys(collectedValues, index + 1, owningBucketOrd, doc); // dfs
405422
scratch.seek(position); // backtrack
406423
}
407424
}
@@ -441,9 +458,14 @@ static InternalValuesSource bytesValuesSource(ValuesSource valuesSource, Include
441458
if (i > 0 && bytes.equals(previous)) {
442459
continue;
443460
}
444-
BytesRef copy = BytesRef.deepCopyOf(bytes);
445-
termValues.add(TermValue.of(copy));
446-
previous = copy;
461+
// Performing a deep copy is not required for field containing only one value.
462+
if (valuesCount > 1) {
463+
BytesRef copy = BytesRef.deepCopyOf(bytes);
464+
termValues.add(TermValue.of(copy));
465+
previous = copy;
466+
} else {
467+
termValues.add(TermValue.of(bytes));
468+
}
447469
}
448470
return termValues;
449471
};

server/src/test/java/org/opensearch/search/aggregations/bucket/terms/MultiTermsAggregatorTests.java

+56-14
Original file line numberDiff line numberDiff line change
@@ -126,6 +126,19 @@ public class MultiTermsAggregatorTests extends AggregatorTestCase {
126126

127127
private static final Consumer<MultiTermsAggregationBuilder> NONE_DECORATOR = null;
128128

129+
private static final Consumer<InternalMultiTerms> IP_AND_KEYWORD_DESC_ORDER_VERIFY = h -> {
130+
MatcherAssert.assertThat(h.getBuckets(), hasSize(3));
131+
MatcherAssert.assertThat(h.getBuckets().get(0).getKey(), contains(equalTo("a"), equalTo("192.168.0.0")));
132+
MatcherAssert.assertThat(h.getBuckets().get(0).getKeyAsString(), equalTo("a|192.168.0.0"));
133+
MatcherAssert.assertThat(h.getBuckets().get(0).getDocCount(), equalTo(2L));
134+
MatcherAssert.assertThat(h.getBuckets().get(1).getKey(), contains(equalTo("b"), equalTo("192.168.0.1")));
135+
MatcherAssert.assertThat(h.getBuckets().get(1).getKeyAsString(), equalTo("b|192.168.0.1"));
136+
MatcherAssert.assertThat(h.getBuckets().get(1).getDocCount(), equalTo(1L));
137+
MatcherAssert.assertThat(h.getBuckets().get(2).getKey(), contains(equalTo("c"), equalTo("192.168.0.2")));
138+
MatcherAssert.assertThat(h.getBuckets().get(2).getKeyAsString(), equalTo("c|192.168.0.2"));
139+
MatcherAssert.assertThat(h.getBuckets().get(2).getDocCount(), equalTo(1L));
140+
};
141+
129142
@Override
130143
protected List<ValuesSourceType> getSupportedValuesSourceTypes() {
131144
return Collections.unmodifiableList(
@@ -672,8 +685,48 @@ public void testDatesFieldFormat() throws IOException {
672685
);
673686
}
674687

675-
public void testIpAndKeyword() throws IOException {
676-
testAggregation(new MatchAllDocsQuery(), fieldConfigs(asList(KEYWORD_FIELD, IP_FIELD)), NONE_DECORATOR, iw -> {
688+
public void testIpAndKeywordDefaultDescOrder() throws IOException {
689+
ipAndKeywordTest(NONE_DECORATOR, IP_AND_KEYWORD_DESC_ORDER_VERIFY);
690+
}
691+
692+
public void testIpAndKeywordWithBucketCountSameAsSize() throws IOException {
693+
ipAndKeywordTest(multiTermsAggregationBuilder -> {
694+
multiTermsAggregationBuilder.minDocCount(0);
695+
multiTermsAggregationBuilder.size(3);
696+
multiTermsAggregationBuilder.order(BucketOrder.compound(BucketOrder.count(false)));
697+
}, IP_AND_KEYWORD_DESC_ORDER_VERIFY);
698+
}
699+
700+
public void testIpAndKeywordWithBucketCountGreaterThanSize() throws IOException {
701+
ipAndKeywordTest(multiTermsAggregationBuilder -> {
702+
multiTermsAggregationBuilder.minDocCount(0);
703+
multiTermsAggregationBuilder.size(10);
704+
multiTermsAggregationBuilder.order(BucketOrder.compound(BucketOrder.count(false)));
705+
}, IP_AND_KEYWORD_DESC_ORDER_VERIFY);
706+
}
707+
708+
public void testIpAndKeywordAscOrder() throws IOException {
709+
ipAndKeywordTest(multiTermsAggregationBuilder -> {
710+
multiTermsAggregationBuilder.minDocCount(0);
711+
multiTermsAggregationBuilder.size(3);
712+
multiTermsAggregationBuilder.order(BucketOrder.compound(BucketOrder.count(true)));
713+
}, h -> {
714+
MatcherAssert.assertThat(h.getBuckets(), hasSize(3));
715+
MatcherAssert.assertThat(h.getBuckets().get(0).getKey(), contains(equalTo("b"), equalTo("192.168.0.1")));
716+
MatcherAssert.assertThat(h.getBuckets().get(0).getKeyAsString(), equalTo("b|192.168.0.1"));
717+
MatcherAssert.assertThat(h.getBuckets().get(0).getDocCount(), equalTo(1L));
718+
MatcherAssert.assertThat(h.getBuckets().get(1).getKey(), contains(equalTo("c"), equalTo("192.168.0.2")));
719+
MatcherAssert.assertThat(h.getBuckets().get(1).getKeyAsString(), equalTo("c|192.168.0.2"));
720+
MatcherAssert.assertThat(h.getBuckets().get(1).getDocCount(), equalTo(1L));
721+
MatcherAssert.assertThat(h.getBuckets().get(2).getKey(), contains(equalTo("a"), equalTo("192.168.0.0")));
722+
MatcherAssert.assertThat(h.getBuckets().get(2).getKeyAsString(), equalTo("a|192.168.0.0"));
723+
MatcherAssert.assertThat(h.getBuckets().get(2).getDocCount(), equalTo(2L));
724+
});
725+
}
726+
727+
private void ipAndKeywordTest(Consumer<MultiTermsAggregationBuilder> builderDecorator, Consumer<InternalMultiTerms> verify)
728+
throws IOException {
729+
testAggregation(new MatchAllDocsQuery(), fieldConfigs(asList(KEYWORD_FIELD, IP_FIELD)), builderDecorator, iw -> {
677730
iw.addDocument(
678731
asList(
679732
new SortedDocValuesField(KEYWORD_FIELD, new BytesRef("a")),
@@ -698,18 +751,7 @@ public void testIpAndKeyword() throws IOException {
698751
new SortedDocValuesField(IP_FIELD, new BytesRef(InetAddressPoint.encode(InetAddresses.forString("192.168.0.0"))))
699752
)
700753
);
701-
}, h -> {
702-
MatcherAssert.assertThat(h.getBuckets(), hasSize(3));
703-
MatcherAssert.assertThat(h.getBuckets().get(0).getKey(), contains(equalTo("a"), equalTo("192.168.0.0")));
704-
MatcherAssert.assertThat(h.getBuckets().get(0).getKeyAsString(), equalTo("a|192.168.0.0"));
705-
MatcherAssert.assertThat(h.getBuckets().get(0).getDocCount(), equalTo(2L));
706-
MatcherAssert.assertThat(h.getBuckets().get(1).getKey(), contains(equalTo("b"), equalTo("192.168.0.1")));
707-
MatcherAssert.assertThat(h.getBuckets().get(1).getKeyAsString(), equalTo("b|192.168.0.1"));
708-
MatcherAssert.assertThat(h.getBuckets().get(1).getDocCount(), equalTo(1L));
709-
MatcherAssert.assertThat(h.getBuckets().get(2).getKey(), contains(equalTo("c"), equalTo("192.168.0.2")));
710-
MatcherAssert.assertThat(h.getBuckets().get(2).getKeyAsString(), equalTo("c|192.168.0.2"));
711-
MatcherAssert.assertThat(h.getBuckets().get(2).getDocCount(), equalTo(1L));
712-
});
754+
}, verify);
713755
}
714756

715757
public void testEmpty() throws IOException {

0 commit comments

Comments
 (0)