Skip to content

Commit 091ab6f

Browse files
[Star tree] Doc count field support in star tree (#15282)
--------- Signed-off-by: Bharathwaj G <[email protected]>
1 parent 771949d commit 091ab6f

File tree

14 files changed

+644
-205
lines changed

14 files changed

+644
-205
lines changed

server/src/internalClusterTest/java/org/opensearch/index/mapper/StarTreeMapperIT.java

Lines changed: 5 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -264,11 +264,16 @@ public void testValidCompositeIndex() {
264264
);
265265
assertEquals(expectedTimeUnits, dateDim.getIntervals());
266266
assertEquals("numeric_dv", starTreeFieldType.getDimensions().get(1).getField());
267+
assertEquals(2, starTreeFieldType.getMetrics().size());
267268
assertEquals("numeric_dv", starTreeFieldType.getMetrics().get(0).getField());
268269

269270
// Assert default metrics
270271
List<MetricStat> expectedMetrics = Arrays.asList(MetricStat.VALUE_COUNT, MetricStat.SUM, MetricStat.AVG);
271272
assertEquals(expectedMetrics, starTreeFieldType.getMetrics().get(0).getMetrics());
273+
274+
assertEquals("_doc_count", starTreeFieldType.getMetrics().get(1).getField());
275+
assertEquals(List.of(MetricStat.DOC_COUNT), starTreeFieldType.getMetrics().get(1).getMetrics());
276+
272277
assertEquals(10000, starTreeFieldType.getStarTreeConfig().maxLeafDocs());
273278
assertEquals(
274279
StarTreeFieldConfiguration.StarTreeBuildMode.OFF_HEAP,

server/src/main/java/org/opensearch/index/codec/composite/composite99/Composite99DocValuesWriter.java

Lines changed: 32 additions & 7 deletions
Original file line numberDiff line numberDiff line change
@@ -15,6 +15,7 @@
1515
import org.apache.lucene.index.EmptyDocValuesProducer;
1616
import org.apache.lucene.index.FieldInfo;
1717
import org.apache.lucene.index.MergeState;
18+
import org.apache.lucene.index.NumericDocValues;
1819
import org.apache.lucene.index.SegmentWriteState;
1920
import org.apache.lucene.index.SortedNumericDocValues;
2021
import org.opensearch.common.annotation.ExperimentalApi;
@@ -25,6 +26,7 @@
2526
import org.opensearch.index.compositeindex.datacube.startree.StarTreeField;
2627
import org.opensearch.index.compositeindex.datacube.startree.builder.StarTreesBuilder;
2728
import org.opensearch.index.mapper.CompositeMappedFieldType;
29+
import org.opensearch.index.mapper.DocCountFieldMapper;
2830
import org.opensearch.index.mapper.MapperService;
2931

3032
import java.io.IOException;
@@ -63,21 +65,29 @@ public Composite99DocValuesWriter(DocValuesConsumer delegate, SegmentWriteState
6365
this.compositeMappedFieldTypes = mapperService.getCompositeFieldTypes();
6466
compositeFieldSet = new HashSet<>();
6567
segmentFieldSet = new HashSet<>();
68+
// TODO : add integ test for this
6669
for (FieldInfo fi : segmentWriteState.fieldInfos) {
6770
if (DocValuesType.SORTED_NUMERIC.equals(fi.getDocValuesType())) {
6871
segmentFieldSet.add(fi.name);
72+
} else if (fi.name.equals(DocCountFieldMapper.NAME)) {
73+
segmentFieldSet.add(fi.name);
6974
}
7075
}
7176
for (CompositeMappedFieldType type : compositeMappedFieldTypes) {
7277
compositeFieldSet.addAll(type.fields());
7378
}
7479
// check if there are any composite fields which are part of the segment
80+
// TODO : add integ test where there are no composite fields in a segment, test both flush and merge cases
7581
segmentHasCompositeFields = Collections.disjoint(segmentFieldSet, compositeFieldSet) == false;
7682
}
7783

7884
@Override
7985
public void addNumericField(FieldInfo field, DocValuesProducer valuesProducer) throws IOException {
8086
delegate.addNumericField(field, valuesProducer);
87+
// Perform this only during flush flow
88+
if (mergeState.get() == null && segmentHasCompositeFields) {
89+
createCompositeIndicesIfPossible(valuesProducer, field);
90+
}
8191
}
8292

8393
@Override
@@ -119,13 +129,7 @@ private void createCompositeIndicesIfPossible(DocValuesProducer valuesProducer,
119129
if (segmentFieldSet.isEmpty()) {
120130
Set<String> compositeFieldSetCopy = new HashSet<>(compositeFieldSet);
121131
for (String compositeField : compositeFieldSetCopy) {
122-
fieldProducerMap.put(compositeField, new EmptyDocValuesProducer() {
123-
@Override
124-
public SortedNumericDocValues getSortedNumeric(FieldInfo field) {
125-
return DocValues.emptySortedNumeric();
126-
}
127-
});
128-
compositeFieldSet.remove(compositeField);
132+
addDocValuesForEmptyField(compositeField);
129133
}
130134
}
131135
// we have all the required fields to build composite fields
@@ -138,7 +142,28 @@ public SortedNumericDocValues getSortedNumeric(FieldInfo field) {
138142
}
139143
}
140144
}
145+
}
141146

147+
/**
148+
* Add empty doc values for fields not present in segment
149+
*/
150+
private void addDocValuesForEmptyField(String compositeField) {
151+
if (compositeField.equals(DocCountFieldMapper.NAME)) {
152+
fieldProducerMap.put(compositeField, new EmptyDocValuesProducer() {
153+
@Override
154+
public NumericDocValues getNumeric(FieldInfo field) {
155+
return DocValues.emptyNumeric();
156+
}
157+
});
158+
} else {
159+
fieldProducerMap.put(compositeField, new EmptyDocValuesProducer() {
160+
@Override
161+
public SortedNumericDocValues getSortedNumeric(FieldInfo field) {
162+
return DocValues.emptySortedNumeric();
163+
}
164+
});
165+
}
166+
compositeFieldSet.remove(compositeField);
142167
}
143168

144169
@Override

server/src/main/java/org/opensearch/index/compositeindex/datacube/MetricStat.java

Lines changed: 16 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -24,13 +24,26 @@ public enum MetricStat {
2424
SUM("sum"),
2525
MIN("min"),
2626
MAX("max"),
27-
AVG("avg", VALUE_COUNT, SUM);
27+
AVG("avg", VALUE_COUNT, SUM),
28+
DOC_COUNT("doc_count", true);
2829

2930
private final String typeName;
3031
private final MetricStat[] baseMetrics;
3132

33+
// System field stats cannot be used as input for user metric types
34+
private final boolean isSystemFieldStat;
35+
36+
MetricStat(String typeName) {
37+
this(typeName, false);
38+
}
39+
3240
MetricStat(String typeName, MetricStat... baseMetrics) {
41+
this(typeName, false, baseMetrics);
42+
}
43+
44+
MetricStat(String typeName, boolean isSystemFieldStat, MetricStat... baseMetrics) {
3345
this.typeName = typeName;
46+
this.isSystemFieldStat = isSystemFieldStat;
3447
this.baseMetrics = baseMetrics;
3548
}
3649

@@ -56,7 +69,8 @@ public boolean isDerivedMetric() {
5669

5770
public static MetricStat fromTypeName(String typeName) {
5871
for (MetricStat metric : MetricStat.values()) {
59-
if (metric.getTypeName().equalsIgnoreCase(typeName)) {
72+
// prevent system fields to be entered as user input
73+
if (metric.getTypeName().equalsIgnoreCase(typeName) && metric.isSystemFieldStat == false) {
6074
return metric;
6175
}
6276
}

server/src/main/java/org/opensearch/index/compositeindex/datacube/startree/StarTreeValidator.java

Lines changed: 2 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -14,6 +14,7 @@
1414
import org.opensearch.index.compositeindex.datacube.Dimension;
1515
import org.opensearch.index.compositeindex.datacube.Metric;
1616
import org.opensearch.index.mapper.CompositeMappedFieldType;
17+
import org.opensearch.index.mapper.DocCountFieldMapper;
1718
import org.opensearch.index.mapper.MappedFieldType;
1819
import org.opensearch.index.mapper.MapperService;
1920
import org.opensearch.index.mapper.StarTreeMapper;
@@ -78,7 +79,7 @@ public static void validate(MapperService mapperService, CompositeIndexSettings
7879
String.format(Locale.ROOT, "unknown metric field [%s] as part of star tree field", metric.getField())
7980
);
8081
}
81-
if (ft.isAggregatable() == false) {
82+
if (ft.isAggregatable() == false && ft instanceof DocCountFieldMapper.DocCountFieldType == false) {
8283
throw new IllegalArgumentException(
8384
String.format(
8485
Locale.ROOT,

server/src/main/java/org/opensearch/index/compositeindex/datacube/startree/aggregators/CountValueAggregator.java

Lines changed: 1 addition & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -17,12 +17,9 @@
1717
class CountValueAggregator implements ValueAggregator<Long> {
1818

1919
public static final long DEFAULT_INITIAL_VALUE = 1L;
20-
private final StarTreeNumericType starTreeNumericType;
2120
private static final StarTreeNumericType VALUE_AGGREGATOR_TYPE = StarTreeNumericType.LONG;
2221

23-
public CountValueAggregator(StarTreeNumericType starTreeNumericType) {
24-
this.starTreeNumericType = starTreeNumericType;
25-
}
22+
public CountValueAggregator() {}
2623

2724
@Override
2825
public StarTreeNumericType getAggregatedValueType() {
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,70 @@
1+
/*
2+
* SPDX-License-Identifier: Apache-2.0
3+
*
4+
* The OpenSearch Contributors require contributions made to
5+
* this file be licensed under the Apache-2.0 license or a
6+
* compatible open source license.
7+
*/
8+
9+
package org.opensearch.index.compositeindex.datacube.startree.aggregators;
10+
11+
import org.opensearch.index.compositeindex.datacube.startree.aggregators.numerictype.StarTreeNumericType;
12+
13+
/**
14+
* Aggregator to handle '_doc_count' field
15+
*
16+
* @opensearch.experimental
17+
*/
18+
public class DocCountAggregator implements ValueAggregator<Long> {
19+
20+
private static final StarTreeNumericType VALUE_AGGREGATOR_TYPE = StarTreeNumericType.LONG;
21+
22+
public DocCountAggregator() {}
23+
24+
@Override
25+
public StarTreeNumericType getAggregatedValueType() {
26+
return VALUE_AGGREGATOR_TYPE;
27+
}
28+
29+
/**
30+
* If _doc_count field for a doc is missing, we increment the _doc_count by '1' for the associated doc
31+
* otherwise take the actual value present in the field
32+
*/
33+
@Override
34+
public Long getInitialAggregatedValueForSegmentDocValue(Long segmentDocValue) {
35+
if (segmentDocValue == null) {
36+
return getIdentityMetricValue();
37+
}
38+
return segmentDocValue;
39+
}
40+
41+
@Override
42+
public Long mergeAggregatedValueAndSegmentValue(Long value, Long segmentDocValue) {
43+
assert value != null;
44+
return mergeAggregatedValues(value, segmentDocValue);
45+
}
46+
47+
@Override
48+
public Long mergeAggregatedValues(Long value, Long aggregatedValue) {
49+
if (value == null) {
50+
value = getIdentityMetricValue();
51+
}
52+
if (aggregatedValue == null) {
53+
aggregatedValue = getIdentityMetricValue();
54+
}
55+
return value + aggregatedValue;
56+
}
57+
58+
@Override
59+
public Long toAggregatedValueType(Long rawValue) {
60+
return rawValue;
61+
}
62+
63+
/**
64+
* If _doc_count field for a doc is missing, we increment the _doc_count by '1' for the associated doc
65+
*/
66+
@Override
67+
public Long getIdentityMetricValue() {
68+
return 1L;
69+
}
70+
}

server/src/main/java/org/opensearch/index/compositeindex/datacube/startree/aggregators/ValueAggregatorFactory.java

Lines changed: 3 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -31,11 +31,13 @@ public static ValueAggregator getValueAggregator(MetricStat aggregationType, Sta
3131
case SUM:
3232
return new SumValueAggregator(starTreeNumericType);
3333
case VALUE_COUNT:
34-
return new CountValueAggregator(starTreeNumericType);
34+
return new CountValueAggregator();
3535
case MIN:
3636
return new MinValueAggregator(starTreeNumericType);
3737
case MAX:
3838
return new MaxValueAggregator(starTreeNumericType);
39+
case DOC_COUNT:
40+
return new DocCountAggregator();
3941
default:
4042
throw new IllegalStateException("Unsupported aggregation type: " + aggregationType);
4143
}

server/src/main/java/org/opensearch/index/compositeindex/datacube/startree/builder/BaseStarTreeBuilder.java

Lines changed: 46 additions & 10 deletions
Original file line numberDiff line numberDiff line change
@@ -10,6 +10,7 @@
1010
import org.apache.logging.log4j.LogManager;
1111
import org.apache.logging.log4j.Logger;
1212
import org.apache.lucene.codecs.DocValuesProducer;
13+
import org.apache.lucene.index.DocValues;
1314
import org.apache.lucene.index.DocValuesType;
1415
import org.apache.lucene.index.FieldInfo;
1516
import org.apache.lucene.index.IndexOptions;
@@ -28,6 +29,7 @@
2829
import org.opensearch.index.compositeindex.datacube.startree.utils.SequentialDocValuesIterator;
2930
import org.opensearch.index.compositeindex.datacube.startree.utils.TreeNode;
3031
import org.opensearch.index.fielddata.IndexNumericFieldData;
32+
import org.opensearch.index.mapper.DocCountFieldMapper;
3133
import org.opensearch.index.mapper.Mapper;
3234
import org.opensearch.index.mapper.MapperService;
3335
import org.opensearch.index.mapper.NumberFieldMapper;
@@ -117,6 +119,16 @@ protected BaseStarTreeBuilder(StarTreeField starTreeField, SegmentWriteState sta
117119
public List<MetricAggregatorInfo> generateMetricAggregatorInfos(MapperService mapperService) {
118120
List<MetricAggregatorInfo> metricAggregatorInfos = new ArrayList<>();
119121
for (Metric metric : this.starTreeField.getMetrics()) {
122+
if (metric.getField().equals(DocCountFieldMapper.NAME)) {
123+
MetricAggregatorInfo metricAggregatorInfo = new MetricAggregatorInfo(
124+
MetricStat.DOC_COUNT,
125+
metric.getField(),
126+
starTreeField.getName(),
127+
IndexNumericFieldData.NumericType.LONG
128+
);
129+
metricAggregatorInfos.add(metricAggregatorInfo);
130+
continue;
131+
}
120132
for (MetricStat metricStat : metric.getMetrics()) {
121133
if (metricStat.isDerivedMetric()) {
122134
continue;
@@ -429,7 +441,7 @@ public void build(Map<String, DocValuesProducer> fieldProducerMap) throws IOExce
429441
String dimension = dimensionsSplitOrder.get(i).getField();
430442
FieldInfo dimensionFieldInfo = state.fieldInfos.fieldInfo(dimension);
431443
if (dimensionFieldInfo == null) {
432-
dimensionFieldInfo = getFieldInfo(dimension);
444+
dimensionFieldInfo = getFieldInfo(dimension, DocValuesType.SORTED_NUMERIC);
433445
}
434446
dimensionReaders[i] = new SequentialDocValuesIterator(
435447
fieldProducerMap.get(dimensionFieldInfo.name).getSortedNumeric(dimensionFieldInfo)
@@ -441,15 +453,15 @@ public void build(Map<String, DocValuesProducer> fieldProducerMap) throws IOExce
441453
logger.debug("Finished Building star-tree in ms : {}", (System.currentTimeMillis() - startTime));
442454
}
443455

444-
private static FieldInfo getFieldInfo(String field) {
456+
private static FieldInfo getFieldInfo(String field, DocValuesType docValuesType) {
445457
return new FieldInfo(
446458
field,
447-
1,
459+
1, // This is filled as part of doc values creation and is not used otherwise
448460
false,
449461
false,
450462
false,
451463
IndexOptions.NONE,
452-
DocValuesType.SORTED_NUMERIC,
464+
docValuesType,
453465
-1,
454466
Collections.emptyMap(),
455467
0,
@@ -473,20 +485,44 @@ public List<SequentialDocValuesIterator> getMetricReaders(SegmentWriteState stat
473485
List<SequentialDocValuesIterator> metricReaders = new ArrayList<>();
474486
for (Metric metric : this.starTreeField.getMetrics()) {
475487
for (MetricStat metricStat : metric.getMetrics()) {
488+
SequentialDocValuesIterator metricReader = null;
476489
FieldInfo metricFieldInfo = state.fieldInfos.fieldInfo(metric.getField());
477-
if (metricFieldInfo == null) {
478-
metricFieldInfo = getFieldInfo(metric.getField());
490+
if (metricStat.equals(MetricStat.DOC_COUNT)) {
491+
// _doc_count is numeric field , so we convert to sortedNumericDocValues and get iterator
492+
metricReader = getIteratorForNumericField(fieldProducerMap, metricFieldInfo, DocCountFieldMapper.NAME);
493+
} else {
494+
if (metricFieldInfo == null) {
495+
metricFieldInfo = getFieldInfo(metric.getField(), DocValuesType.SORTED_NUMERIC);
496+
}
497+
metricReader = new SequentialDocValuesIterator(
498+
fieldProducerMap.get(metricFieldInfo.name).getSortedNumeric(metricFieldInfo)
499+
);
479500
}
480-
481-
SequentialDocValuesIterator metricReader = new SequentialDocValuesIterator(
482-
fieldProducerMap.get(metricFieldInfo.name).getSortedNumeric(metricFieldInfo)
483-
);
484501
metricReaders.add(metricReader);
485502
}
486503
}
487504
return metricReaders;
488505
}
489506

507+
/**
508+
* Converts numericDocValues to sortedNumericDocValues and returns SequentialDocValuesIterator
509+
*/
510+
private SequentialDocValuesIterator getIteratorForNumericField(
511+
Map<String, DocValuesProducer> fieldProducerMap,
512+
FieldInfo fieldInfo,
513+
String name
514+
) throws IOException {
515+
if (fieldInfo == null) {
516+
fieldInfo = getFieldInfo(name, DocValuesType.NUMERIC);
517+
}
518+
SequentialDocValuesIterator sequentialDocValuesIterator;
519+
assert fieldProducerMap.containsKey(fieldInfo.name);
520+
sequentialDocValuesIterator = new SequentialDocValuesIterator(
521+
DocValues.singleton(fieldProducerMap.get(fieldInfo.name).getNumeric(fieldInfo))
522+
);
523+
return sequentialDocValuesIterator;
524+
}
525+
490526
/**
491527
* Builds the star tree using Star-Tree Document
492528
*

server/src/main/java/org/opensearch/index/mapper/StarTreeMapper.java

Lines changed: 6 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -226,6 +226,10 @@ private List<Metric> buildMetrics(String fieldName, Map<String, Object> map, Map
226226
for (Object metric : metricsList) {
227227
Map<String, Object> metricMap = (Map<String, Object>) metric;
228228
String name = (String) XContentMapValues.extractValue(CompositeDataCubeFieldType.NAME, metricMap);
229+
// Handle _doc_count metric separately at the end
230+
if (name.equals(DocCountFieldMapper.NAME)) {
231+
continue;
232+
}
229233
metricMap.remove(CompositeDataCubeFieldType.NAME);
230234
if (objbuilder == null || objbuilder.mappersBuilders == null) {
231235
metrics.add(getMetric(name, metricMap, context));
@@ -250,7 +254,8 @@ private List<Metric> buildMetrics(String fieldName, Map<String, Object> map, Map
250254
} else {
251255
throw new MapperParsingException(String.format(Locale.ROOT, "unable to parse metrics for star tree field [%s]", this.name));
252256
}
253-
257+
Metric docCountMetric = new Metric(DocCountFieldMapper.NAME, List.of(MetricStat.DOC_COUNT));
258+
metrics.add(docCountMetric);
254259
return metrics;
255260
}
256261

0 commit comments

Comments
 (0)