Skip to content

Commit 8b62948

Browse files
[Derived Field] Dynamic FieldType inference based on random sampling of documents (#13592) (#13953)
--------- (cherry picked from commit 6c1896b) Signed-off-by: Rishabh Maurya <[email protected]> Signed-off-by: github-actions[bot] <github-actions[bot]@users.noreply.github.com> Co-authored-by: github-actions[bot] <github-actions[bot]@users.noreply.github.com>
1 parent 7d5c038 commit 8b62948

File tree

2 files changed

+391
-0
lines changed

2 files changed

+391
-0
lines changed
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,181 @@
1+
/*
2+
* SPDX-License-Identifier: Apache-2.0
3+
*
4+
* The OpenSearch Contributors require contributions made to
5+
* this file be licensed under the Apache-2.0 license or a
6+
* compatible open source license.
7+
*/
8+
9+
package org.opensearch.index.mapper;
10+
11+
import org.apache.lucene.index.IndexReader;
12+
import org.apache.lucene.index.ReaderUtil;
13+
import org.opensearch.common.Randomness;
14+
import org.opensearch.common.xcontent.XContentFactory;
15+
import org.opensearch.common.xcontent.json.JsonXContent;
16+
import org.opensearch.core.common.bytes.BytesReference;
17+
import org.opensearch.core.xcontent.XContentBuilder;
18+
import org.opensearch.search.lookup.SourceLookup;
19+
20+
import java.io.IOException;
21+
import java.util.ArrayList;
22+
import java.util.Collections;
23+
import java.util.Iterator;
24+
import java.util.List;
25+
import java.util.Random;
26+
import java.util.Set;
27+
import java.util.TreeSet;
28+
29+
/**
30+
* This class performs type inference by analyzing the _source documents. It uses a random sample of documents to infer the field type, similar to dynamic mapping type guessing logic.
31+
* Unlike guessing based on the first document, where field could be missing, this method generates a random sample to make a more accurate inference.
32+
* This approach is especially useful for handling missing fields, which is common in nested fields within derived fields of object types.
33+
*
34+
* <p>The sample size should be chosen carefully to ensure a high probability of selecting at least one document where the field is present.
35+
* However, it's essential to strike a balance because a large sample size can lead to performance issues since each sample document's _source field is loaded and examined until the field is found.
36+
*
37+
* <p>Determining the sample size ({@code S}) is akin to deciding how many balls to draw from a bin, ensuring a high probability ({@code >=P}) of drawing at least one green ball (documents with the field) from a mixture of {@code R } red balls (documents without the field) and {@code G } green balls:
38+
* <pre>{@code
39+
* P >= 1 - C(R, S) / C(R + G, S)
40+
* }</pre>
41+
* Here, {@code C()} represents the binomial coefficient.
42+
* For a high confidence level, we aim for {@code P >= 0.95 }. For example, with {@code 10^7 } documents where the field is present in {@code 2% } of them, the sample size {@code S } should be around 149 to achieve a probability of {@code 0.95}.
43+
*/
44+
public class FieldTypeInference {
45+
private final IndexReader indexReader;
46+
private final String indexName;
47+
private final MapperService mapperService;
48+
// TODO expose using a index setting
49+
private int sampleSize;
50+
private static final int DEFAULT_SAMPLE_SIZE = 150;
51+
private static final int MAX_SAMPLE_SIZE_ALLOWED = 1000;
52+
53+
public FieldTypeInference(String indexName, MapperService mapperService, IndexReader indexReader) {
54+
this.indexName = indexName;
55+
this.mapperService = mapperService;
56+
this.indexReader = indexReader;
57+
this.sampleSize = DEFAULT_SAMPLE_SIZE;
58+
}
59+
60+
public void setSampleSize(int sampleSize) {
61+
if (sampleSize > MAX_SAMPLE_SIZE_ALLOWED) {
62+
throw new IllegalArgumentException("sample_size should be less than " + MAX_SAMPLE_SIZE_ALLOWED);
63+
}
64+
this.sampleSize = sampleSize;
65+
}
66+
67+
public int getSampleSize() {
68+
return sampleSize;
69+
}
70+
71+
public Mapper infer(ValueFetcher valueFetcher) throws IOException {
72+
RandomSourceValuesGenerator valuesGenerator = new RandomSourceValuesGenerator(sampleSize, indexReader, valueFetcher);
73+
Mapper inferredMapper = null;
74+
while (inferredMapper == null && valuesGenerator.hasNext()) {
75+
List<Object> values = valuesGenerator.next();
76+
if (values == null || values.isEmpty()) {
77+
continue;
78+
}
79+
// always use first value in case of multi value field to infer type
80+
inferredMapper = inferTypeFromObject(values.get(0));
81+
}
82+
return inferredMapper;
83+
}
84+
85+
private Mapper inferTypeFromObject(Object o) throws IOException {
86+
if (o == null) {
87+
return null;
88+
}
89+
DocumentMapper mapper = mapperService.documentMapper();
90+
XContentBuilder builder = XContentFactory.jsonBuilder().startObject().field("field", o).endObject();
91+
BytesReference bytesReference = BytesReference.bytes(builder);
92+
SourceToParse sourceToParse = new SourceToParse(indexName, "_id", bytesReference, JsonXContent.jsonXContent.mediaType());
93+
ParsedDocument parsedDocument = mapper.parse(sourceToParse);
94+
Mapping mapping = parsedDocument.dynamicMappingsUpdate();
95+
return mapping.root.getMapper("field");
96+
}
97+
98+
private static class RandomSourceValuesGenerator implements Iterator<List<Object>> {
99+
private final ValueFetcher valueFetcher;
100+
private final IndexReader indexReader;
101+
private final SourceLookup sourceLookup;
102+
private final int[] docs;
103+
private int iter;
104+
private int leaf;
105+
private final int MAX_ATTEMPTS_TO_GENERATE_RANDOM_SAMPLES = 10000;
106+
107+
public RandomSourceValuesGenerator(int sampleSize, IndexReader indexReader, ValueFetcher valueFetcher) {
108+
this.valueFetcher = valueFetcher;
109+
this.indexReader = indexReader;
110+
sampleSize = Math.min(sampleSize, indexReader.numDocs());
111+
this.docs = getSortedRandomNum(
112+
sampleSize,
113+
indexReader.numDocs(),
114+
Math.max(sampleSize, MAX_ATTEMPTS_TO_GENERATE_RANDOM_SAMPLES)
115+
);
116+
this.iter = 0;
117+
this.leaf = -1;
118+
this.sourceLookup = new SourceLookup();
119+
if (hasNext()) {
120+
setNextLeaf();
121+
}
122+
}
123+
124+
@Override
125+
public boolean hasNext() {
126+
return iter < docs.length && leaf < indexReader.leaves().size();
127+
}
128+
129+
/**
130+
* Ensure hasNext() is called before calling next()
131+
*/
132+
@Override
133+
public List<Object> next() {
134+
int docID = docs[iter] - indexReader.leaves().get(leaf).docBase;
135+
if (docID >= indexReader.leaves().get(leaf).reader().numDocs()) {
136+
setNextLeaf();
137+
}
138+
// deleted docs are getting used to infer type, which should be okay?
139+
sourceLookup.setSegmentAndDocument(indexReader.leaves().get(leaf), docs[iter] - indexReader.leaves().get(leaf).docBase);
140+
try {
141+
iter++;
142+
return valueFetcher.fetchValues(sourceLookup);
143+
} catch (IOException e) {
144+
throw new RuntimeException(e);
145+
}
146+
}
147+
148+
private void setNextLeaf() {
149+
int readerIndex = ReaderUtil.subIndex(docs[iter], indexReader.leaves());
150+
if (readerIndex != leaf) {
151+
leaf = readerIndex;
152+
} else {
153+
// this will only happen when leaves are exhausted and readerIndex will be indexReader.leaves()-1.
154+
leaf++;
155+
}
156+
if (leaf < indexReader.leaves().size()) {
157+
valueFetcher.setNextReader(indexReader.leaves().get(leaf));
158+
}
159+
}
160+
161+
private static int[] getSortedRandomNum(int sampleSize, int upperBound, int attempts) {
162+
Set<Integer> generatedNumbers = new TreeSet<>();
163+
Random random = Randomness.get();
164+
int itr = 0;
165+
if (upperBound <= 10 * sampleSize) {
166+
List<Integer> numberList = new ArrayList<>();
167+
for (int i = 0; i < upperBound; i++) {
168+
numberList.add(i);
169+
}
170+
Collections.shuffle(numberList, random);
171+
generatedNumbers.addAll(numberList.subList(0, sampleSize));
172+
} else {
173+
while (generatedNumbers.size() < sampleSize && itr++ < attempts) {
174+
int randomNumber = random.nextInt(upperBound);
175+
generatedNumbers.add(randomNumber);
176+
}
177+
}
178+
return generatedNumbers.stream().mapToInt(Integer::valueOf).toArray();
179+
}
180+
}
181+
}
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,210 @@
1+
/*
2+
* SPDX-License-Identifier: Apache-2.0
3+
*
4+
* The OpenSearch Contributors require contributions made to
5+
* this file be licensed under the Apache-2.0 license or a
6+
* compatible open source license.
7+
*/
8+
9+
package org.opensearch.index.mapper;
10+
11+
import org.apache.lucene.document.Document;
12+
import org.apache.lucene.index.DirectoryReader;
13+
import org.apache.lucene.index.IndexReader;
14+
import org.apache.lucene.index.IndexWriter;
15+
import org.apache.lucene.index.IndexWriterConfig;
16+
import org.apache.lucene.index.LeafReaderContext;
17+
import org.apache.lucene.store.Directory;
18+
import org.opensearch.common.lucene.Lucene;
19+
import org.opensearch.core.index.Index;
20+
import org.opensearch.index.query.QueryShardContext;
21+
import org.opensearch.search.lookup.SourceLookup;
22+
23+
import java.io.IOException;
24+
import java.util.ArrayList;
25+
import java.util.HashMap;
26+
import java.util.List;
27+
import java.util.Map;
28+
29+
import static org.mockito.Mockito.when;
30+
31+
public class FieldTypeInferenceTests extends MapperServiceTestCase {
32+
33+
private static final Map<String, List<Object>> documentMap;
34+
static {
35+
List<Object> listWithNull = new ArrayList<>();
36+
listWithNull.add(null);
37+
documentMap = new HashMap<>();
38+
documentMap.put("text_field", List.of("The quick brown fox jumps over the lazy dog."));
39+
documentMap.put("int_field", List.of(789));
40+
documentMap.put("float_field", List.of(123.45));
41+
documentMap.put("date_field_1", List.of("2024-05-12T15:45:00Z"));
42+
documentMap.put("date_field_2", List.of("2024-05-12"));
43+
documentMap.put("boolean_field", List.of(true));
44+
documentMap.put("null_field", listWithNull);
45+
documentMap.put("array_field_int", List.of(100, 200, 300, 400, 500));
46+
documentMap.put("array_field_text", List.of("100", "200"));
47+
documentMap.put("object_type", List.of(Map.of("foo", Map.of("bar", 10))));
48+
}
49+
50+
public void testJsonSupportedTypes() throws IOException {
51+
MapperService mapperService = createMapperService(topMapping(b -> {}));
52+
QueryShardContext queryShardContext = createQueryShardContext(mapperService);
53+
when(queryShardContext.index()).thenReturn(new Index("test_index", "uuid"));
54+
int totalDocs = 10000;
55+
int docsPerLeafCount = 1000;
56+
try (Directory dir = newDirectory()) {
57+
IndexWriter iw = new IndexWriter(dir, new IndexWriterConfig(Lucene.STANDARD_ANALYZER));
58+
Document d = new Document();
59+
for (int i = 0; i < totalDocs; i++) {
60+
iw.addDocument(d);
61+
if ((i + 1) % docsPerLeafCount == 0) {
62+
iw.commit();
63+
}
64+
}
65+
try (IndexReader reader = DirectoryReader.open(iw)) {
66+
iw.close();
67+
FieldTypeInference typeInference = new FieldTypeInference("test_index", queryShardContext.getMapperService(), reader);
68+
String[] fieldName = { "text_field" };
69+
Mapper mapper = typeInference.infer(lookup -> documentMap.get(fieldName[0]));
70+
assertEquals("text", mapper.typeName());
71+
72+
fieldName[0] = "int_field";
73+
mapper = typeInference.infer(lookup -> documentMap.get(fieldName[0]));
74+
assertEquals("long", mapper.typeName());
75+
76+
fieldName[0] = "float_field";
77+
mapper = typeInference.infer(lookup -> documentMap.get(fieldName[0]));
78+
assertEquals("float", mapper.typeName());
79+
80+
fieldName[0] = "date_field_1";
81+
mapper = typeInference.infer(lookup -> documentMap.get(fieldName[0]));
82+
assertEquals("date", mapper.typeName());
83+
84+
fieldName[0] = "date_field_2";
85+
mapper = typeInference.infer(lookup -> documentMap.get(fieldName[0]));
86+
assertEquals("date", mapper.typeName());
87+
88+
fieldName[0] = "boolean_field";
89+
mapper = typeInference.infer(lookup -> documentMap.get(fieldName[0]));
90+
assertEquals("boolean", mapper.typeName());
91+
92+
fieldName[0] = "array_field_int";
93+
mapper = typeInference.infer(lookup -> documentMap.get(fieldName[0]));
94+
assertEquals("long", mapper.typeName());
95+
96+
fieldName[0] = "array_field_text";
97+
mapper = typeInference.infer(lookup -> documentMap.get(fieldName[0]));
98+
assertEquals("text", mapper.typeName());
99+
100+
fieldName[0] = "object_type";
101+
mapper = typeInference.infer(lookup -> documentMap.get(fieldName[0]));
102+
assertEquals("object", mapper.typeName());
103+
104+
fieldName[0] = "null_field";
105+
mapper = typeInference.infer(lookup -> documentMap.get(fieldName[0]));
106+
assertNull(mapper);
107+
108+
// If field is missing ensure that sample docIDs generated for inference are ordered and are in bounds
109+
fieldName[0] = "missing_field";
110+
List<List<Integer>> docsEvaluated = new ArrayList<>();
111+
int[] totalDocsEvaluated = { 0 };
112+
typeInference.setSampleSize(50);
113+
mapper = typeInference.infer(new ValueFetcher() {
114+
@Override
115+
public List<Object> fetchValues(SourceLookup lookup) throws IOException {
116+
docsEvaluated.get(docsEvaluated.size() - 1).add(lookup.docId());
117+
totalDocsEvaluated[0]++;
118+
return documentMap.get(fieldName[0]);
119+
}
120+
121+
@Override
122+
public void setNextReader(LeafReaderContext leafReaderContext) {
123+
docsEvaluated.add(new ArrayList<>());
124+
}
125+
});
126+
assertNull(mapper);
127+
assertEquals(typeInference.getSampleSize(), totalDocsEvaluated[0]);
128+
for (List<Integer> docsPerLeaf : docsEvaluated) {
129+
for (int j = 0; j < docsPerLeaf.size() - 1; j++) {
130+
assertTrue(docsPerLeaf.get(j) < docsPerLeaf.get(j + 1));
131+
}
132+
if (!docsPerLeaf.isEmpty()) {
133+
assertTrue(docsPerLeaf.get(0) >= 0 && docsPerLeaf.get(docsPerLeaf.size() - 1) < docsPerLeafCount);
134+
}
135+
}
136+
}
137+
}
138+
}
139+
140+
public void testDeleteAllDocs() throws IOException {
141+
MapperService mapperService = createMapperService(topMapping(b -> {}));
142+
QueryShardContext queryShardContext = createQueryShardContext(mapperService);
143+
when(queryShardContext.index()).thenReturn(new Index("test_index", "uuid"));
144+
int totalDocs = 10000;
145+
int docsPerLeafCount = 1000;
146+
try (Directory dir = newDirectory()) {
147+
IndexWriter iw = new IndexWriter(dir, new IndexWriterConfig(Lucene.STANDARD_ANALYZER));
148+
Document d = new Document();
149+
for (int i = 0; i < totalDocs; i++) {
150+
iw.addDocument(d);
151+
if ((i + 1) % docsPerLeafCount == 0) {
152+
iw.commit();
153+
}
154+
}
155+
iw.deleteAll();
156+
iw.commit();
157+
158+
try (IndexReader reader = DirectoryReader.open(iw)) {
159+
iw.close();
160+
FieldTypeInference typeInference = new FieldTypeInference("test_index", queryShardContext.getMapperService(), reader);
161+
String[] fieldName = { "text_field" };
162+
Mapper mapper = typeInference.infer(lookup -> documentMap.get(fieldName[0]));
163+
assertNull(mapper);
164+
}
165+
}
166+
}
167+
168+
public void testZeroDoc() throws IOException {
169+
MapperService mapperService = createMapperService(topMapping(b -> {}));
170+
QueryShardContext queryShardContext = createQueryShardContext(mapperService);
171+
when(queryShardContext.index()).thenReturn(new Index("test_index", "uuid"));
172+
try (Directory dir = newDirectory()) {
173+
IndexWriter iw = new IndexWriter(dir, new IndexWriterConfig(Lucene.STANDARD_ANALYZER));
174+
try (IndexReader reader = DirectoryReader.open(iw)) {
175+
iw.close();
176+
FieldTypeInference typeInference = new FieldTypeInference("test_index", queryShardContext.getMapperService(), reader);
177+
String[] fieldName = { "text_field" };
178+
Mapper mapper = typeInference.infer(lookup -> documentMap.get(fieldName[0]));
179+
assertNull(mapper);
180+
}
181+
}
182+
}
183+
184+
public void testSampleGeneration() throws IOException {
185+
MapperService mapperService = createMapperService(topMapping(b -> {}));
186+
QueryShardContext queryShardContext = createQueryShardContext(mapperService);
187+
when(queryShardContext.index()).thenReturn(new Index("test_index", "uuid"));
188+
int totalDocs = 10000;
189+
int docsPerLeafCount = 1000;
190+
try (Directory dir = newDirectory()) {
191+
IndexWriter iw = new IndexWriter(dir, new IndexWriterConfig(Lucene.STANDARD_ANALYZER));
192+
Document d = new Document();
193+
for (int i = 0; i < totalDocs; i++) {
194+
iw.addDocument(d);
195+
if ((i + 1) % docsPerLeafCount == 0) {
196+
iw.commit();
197+
}
198+
}
199+
try (IndexReader reader = DirectoryReader.open(iw)) {
200+
iw.close();
201+
FieldTypeInference typeInference = new FieldTypeInference("test_index", queryShardContext.getMapperService(), reader);
202+
typeInference.setSampleSize(1000 - 1);
203+
typeInference.infer(lookup -> documentMap.get("unknown_field"));
204+
assertThrows(IllegalArgumentException.class, () -> typeInference.setSampleSize(1000 + 1));
205+
typeInference.setSampleSize(1000);
206+
typeInference.infer(lookup -> documentMap.get("unknown_field"));
207+
}
208+
}
209+
}
210+
}

0 commit comments

Comments
 (0)