Skip to content

Commit b62d654

Browse files
committed
Add stats tracking for semantic field
Signed-off-by: Bo Zhang <[email protected]>
1 parent 8068294 commit b62d654

File tree

12 files changed

+198
-118
lines changed

12 files changed

+198
-118
lines changed

CHANGELOG.md

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -22,6 +22,7 @@ The format is based on [Keep a Changelog](https://keepachangelog.com/en/1.0.0/),
2222
- [Stats] Add stats for text embedding processor with different settings ([#1332](https://github.com/opensearch-project/neural-search/pull/1332))
2323
- Validate model id and analyzer should not be provided at the same time for the neural sparse query ([#1359](https://github.com/opensearch-project/neural-search/pull/1359))
2424
- [Stats] Add stats for score based and rank based normalization processors ([#1326](https://github.com/opensearch-project/neural-search/pull/1326))
25+
- [Stats] Add stats tracking for semantic field ([#1362](https://github.com/opensearch-project/neural-search/pull/1362))
2526

2627
### Bug Fixes
2728
- Fix score value as null for single shard when sorting is not done on score field ([#1277](https://github.com/opensearch-project/neural-search/pull/1277))

qa/rolling-upgrade/src/test/java/org/opensearch/neuralsearch/bwc/rolling/RestNeuralStatsActionIT.java

Lines changed: 87 additions & 99 deletions
Original file line numberDiff line numberDiff line change
@@ -4,18 +4,6 @@
44
*/
55
package org.opensearch.neuralsearch.bwc.rolling;
66

7-
import org.opensearch.neuralsearch.stats.events.EventStatName;
8-
import org.opensearch.neuralsearch.stats.info.InfoStatName;
9-
10-
import java.nio.file.Files;
11-
import java.nio.file.Path;
12-
import java.util.ArrayList;
13-
import java.util.Map;
14-
15-
import static org.opensearch.neuralsearch.util.TestUtils.NODES_BWC_CLUSTER;
16-
import static org.opensearch.neuralsearch.util.TestUtils.TEXT_EMBEDDING_PROCESSOR;
17-
import static org.opensearch.neuralsearch.util.TestUtils.getModelId;
18-
197
public class RestNeuralStatsActionIT extends AbstractRollingUpgradeTestCase {
208
private static final String PIPELINE_NAME = "nlp-pipeline-stats";
219
private static final String TEST_FIELD = "passage_text";
@@ -32,91 +20,91 @@ public class RestNeuralStatsActionIT extends AbstractRollingUpgradeTestCase {
3220
// TODO: There is a bug in stats api which need to be fixed before enabling following tests
3321
// https://github.com/opensearch-project/neural-search/issues/1368
3422

35-
// public void testStats_E2EFlow() throws Exception {
36-
//
37-
// waitForClusterHealthGreen(NODES_BWC_CLUSTER, 90);
38-
// updateClusterSettings("plugins.neural_search.stats_enabled", true);
39-
//
40-
// // Get initial stats
41-
// String responseBody = executeNeuralStatRequest(new ArrayList<>(), new ArrayList<>());
42-
// logger.info("Initial:" + responseBody);
43-
// Map<String, Object> infoStats = parseInfoStatsResponse(responseBody);
44-
// Map<String, Object> aggregatedNodeStats = parseAggregatedNodeStatsResponse(responseBody);
45-
//
46-
// int numberOfExecution = (int) getNestedValue(aggregatedNodeStats, EventStatName.TEXT_EMBEDDING_PROCESSOR_EXECUTIONS);
47-
// int numberOfProcessor = (int) getNestedValue(infoStats, InfoStatName.TEXT_EMBEDDING_PROCESSORS);
48-
//
49-
// switch (getClusterType()) {
50-
// case OLD:
51-
// modelId = uploadTextEmbeddingModel();
52-
// loadModel(modelId);
53-
// createPipelineProcessor(modelId, PIPELINE_NAME);
54-
// createIndexWithConfiguration(
55-
// getIndexNameForTest(),
56-
// Files.readString(Path.of(classLoader.getResource("processor/IndexMappings.json").toURI())),
57-
// PIPELINE_NAME
58-
// );
59-
// addDocument(getIndexNameForTest(), "0", TEST_FIELD, TEXT, null, null);
60-
// addDocument(getIndexNameForTest(), "1", TEST_FIELD, TEXT, null, null);
61-
// addDocument(getIndexNameForTest(), "2", TEST_FIELD, TEXT, null, null);
62-
//
63-
// responseBody = executeNeuralStatRequest(new ArrayList<>(), new ArrayList<>());
64-
// logger.info("Old after insert:" + responseBody);
65-
// assertEquals(
66-
// numberOfExecution + 3,
67-
// getNestedValue(parseAggregatedNodeStatsResponse(responseBody), EventStatName.TEXT_EMBEDDING_PROCESSOR_EXECUTIONS)
68-
// );
69-
// assertEquals(
70-
// numberOfProcessor + 1,
71-
// getNestedValue(parseInfoStatsResponse(responseBody), InfoStatName.TEXT_EMBEDDING_PROCESSORS)
72-
// );
73-
// break;
74-
// case MIXED:
75-
// modelId = getModelId(getIngestionPipeline(PIPELINE_NAME), TEXT_EMBEDDING_PROCESSOR);
76-
// loadModel(modelId);
77-
// addDocument(getIndexNameForTest(), "3", TEST_FIELD, TEXT_MIXED, null, null);
78-
// addDocument(getIndexNameForTest(), "4", TEST_FIELD, TEXT_MIXED, null, null);
79-
// addDocument(getIndexNameForTest(), "5", TEST_FIELD, TEXT_MIXED, null, null);
80-
//
81-
// // Get stats
82-
// responseBody = executeNeuralStatRequest(new ArrayList<>(), new ArrayList<>());
83-
// logger.info("Mixed after insert:" + responseBody);
84-
//
85-
// assertEquals(
86-
// numberOfExecution + 3,
87-
// getNestedValue(parseAggregatedNodeStatsResponse(responseBody), EventStatName.TEXT_EMBEDDING_PROCESSOR_EXECUTIONS)
88-
// );
89-
// assertEquals(
90-
// numberOfProcessor,
91-
// getNestedValue(parseInfoStatsResponse(responseBody), InfoStatName.TEXT_EMBEDDING_PROCESSORS)
92-
// );
93-
// break;
94-
// case UPGRADED:
95-
// try {
96-
// modelId = getModelId(getIngestionPipeline(PIPELINE_NAME), TEXT_EMBEDDING_PROCESSOR);
97-
// loadModel(modelId);
98-
// addDocument(getIndexNameForTest(), "6", TEST_FIELD, TEXT_UPGRADED, null, null);
99-
// addDocument(getIndexNameForTest(), "7", TEST_FIELD, TEXT_UPGRADED, null, null);
100-
// addDocument(getIndexNameForTest(), "8", TEST_FIELD, TEXT_UPGRADED, null, null);
101-
//
102-
// // Get stats
103-
// responseBody = executeNeuralStatRequest(new ArrayList<>(), new ArrayList<>());
104-
// logger.info("Upgraded after insert:" + responseBody);
105-
//
106-
// assertEquals(
107-
// numberOfExecution + 3,
108-
// getNestedValue(parseAggregatedNodeStatsResponse(responseBody), EventStatName.TEXT_EMBEDDING_PROCESSOR_EXECUTIONS)
109-
// );
110-
// assertEquals(
111-
// numberOfProcessor,
112-
// getNestedValue(parseInfoStatsResponse(responseBody), InfoStatName.TEXT_EMBEDDING_PROCESSORS)
113-
// );
114-
// } finally {
115-
// wipeOfTestResources(getIndexNameForTest(), PIPELINE_NAME, modelId, null);
116-
// }
117-
// break;
118-
// default:
119-
// throw new IllegalStateException("Unexpected value: " + getClusterType());
120-
// }
121-
// }
23+
// public void testStats_E2EFlow() throws Exception {
24+
//
25+
// waitForClusterHealthGreen(NODES_BWC_CLUSTER, 90);
26+
// updateClusterSettings("plugins.neural_search.stats_enabled", true);
27+
//
28+
// // Get initial stats
29+
// String responseBody = executeNeuralStatRequest(new ArrayList<>(), new ArrayList<>());
30+
// logger.info("Initial:" + responseBody);
31+
// Map<String, Object> infoStats = parseInfoStatsResponse(responseBody);
32+
// Map<String, Object> aggregatedNodeStats = parseAggregatedNodeStatsResponse(responseBody);
33+
//
34+
// int numberOfExecution = (int) getNestedValue(aggregatedNodeStats, EventStatName.TEXT_EMBEDDING_PROCESSOR_EXECUTIONS);
35+
// int numberOfProcessor = (int) getNestedValue(infoStats, InfoStatName.TEXT_EMBEDDING_PROCESSORS);
36+
//
37+
// switch (getClusterType()) {
38+
// case OLD:
39+
// modelId = uploadTextEmbeddingModel();
40+
// loadModel(modelId);
41+
// createPipelineProcessor(modelId, PIPELINE_NAME);
42+
// createIndexWithConfiguration(
43+
// getIndexNameForTest(),
44+
// Files.readString(Path.of(classLoader.getResource("processor/IndexMappings.json").toURI())),
45+
// PIPELINE_NAME
46+
// );
47+
// addDocument(getIndexNameForTest(), "0", TEST_FIELD, TEXT, null, null);
48+
// addDocument(getIndexNameForTest(), "1", TEST_FIELD, TEXT, null, null);
49+
// addDocument(getIndexNameForTest(), "2", TEST_FIELD, TEXT, null, null);
50+
//
51+
// responseBody = executeNeuralStatRequest(new ArrayList<>(), new ArrayList<>());
52+
// logger.info("Old after insert:" + responseBody);
53+
// assertEquals(
54+
// numberOfExecution + 3,
55+
// getNestedValue(parseAggregatedNodeStatsResponse(responseBody), EventStatName.TEXT_EMBEDDING_PROCESSOR_EXECUTIONS)
56+
// );
57+
// assertEquals(
58+
// numberOfProcessor + 1,
59+
// getNestedValue(parseInfoStatsResponse(responseBody), InfoStatName.TEXT_EMBEDDING_PROCESSORS)
60+
// );
61+
// break;
62+
// case MIXED:
63+
// modelId = getModelId(getIngestionPipeline(PIPELINE_NAME), TEXT_EMBEDDING_PROCESSOR);
64+
// loadModel(modelId);
65+
// addDocument(getIndexNameForTest(), "3", TEST_FIELD, TEXT_MIXED, null, null);
66+
// addDocument(getIndexNameForTest(), "4", TEST_FIELD, TEXT_MIXED, null, null);
67+
// addDocument(getIndexNameForTest(), "5", TEST_FIELD, TEXT_MIXED, null, null);
68+
//
69+
// // Get stats
70+
// responseBody = executeNeuralStatRequest(new ArrayList<>(), new ArrayList<>());
71+
// logger.info("Mixed after insert:" + responseBody);
72+
//
73+
// assertEquals(
74+
// numberOfExecution + 3,
75+
// getNestedValue(parseAggregatedNodeStatsResponse(responseBody), EventStatName.TEXT_EMBEDDING_PROCESSOR_EXECUTIONS)
76+
// );
77+
// assertEquals(
78+
// numberOfProcessor,
79+
// getNestedValue(parseInfoStatsResponse(responseBody), InfoStatName.TEXT_EMBEDDING_PROCESSORS)
80+
// );
81+
// break;
82+
// case UPGRADED:
83+
// try {
84+
// modelId = getModelId(getIngestionPipeline(PIPELINE_NAME), TEXT_EMBEDDING_PROCESSOR);
85+
// loadModel(modelId);
86+
// addDocument(getIndexNameForTest(), "6", TEST_FIELD, TEXT_UPGRADED, null, null);
87+
// addDocument(getIndexNameForTest(), "7", TEST_FIELD, TEXT_UPGRADED, null, null);
88+
// addDocument(getIndexNameForTest(), "8", TEST_FIELD, TEXT_UPGRADED, null, null);
89+
//
90+
// // Get stats
91+
// responseBody = executeNeuralStatRequest(new ArrayList<>(), new ArrayList<>());
92+
// logger.info("Upgraded after insert:" + responseBody);
93+
//
94+
// assertEquals(
95+
// numberOfExecution + 3,
96+
// getNestedValue(parseAggregatedNodeStatsResponse(responseBody), EventStatName.TEXT_EMBEDDING_PROCESSOR_EXECUTIONS)
97+
// );
98+
// assertEquals(
99+
// numberOfProcessor,
100+
// getNestedValue(parseInfoStatsResponse(responseBody), InfoStatName.TEXT_EMBEDDING_PROCESSORS)
101+
// );
102+
// } finally {
103+
// wipeOfTestResources(getIndexNameForTest(), PIPELINE_NAME, modelId, null);
104+
// }
105+
// break;
106+
// default:
107+
// throw new IllegalStateException("Unexpected value: " + getClusterType());
108+
// }
109+
// }
122110
}

src/main/java/org/opensearch/neuralsearch/processor/semantic/SemanticFieldProcessor.java

Lines changed: 20 additions & 5 deletions
Original file line numberDiff line numberDiff line change
@@ -20,6 +20,8 @@
2020
import org.opensearch.neuralsearch.processor.chunker.Chunker;
2121
import org.opensearch.neuralsearch.processor.chunker.FixedTokenLengthChunker;
2222
import org.opensearch.neuralsearch.processor.dto.SemanticFieldInfo;
23+
import org.opensearch.neuralsearch.stats.events.EventStatName;
24+
import org.opensearch.neuralsearch.stats.events.EventStatsManager;
2325
import org.opensearch.neuralsearch.util.TokenWeightUtil;
2426
import org.opensearch.neuralsearch.util.prune.PruneType;
2527
import org.opensearch.neuralsearch.util.prune.PruneUtils;
@@ -118,6 +120,7 @@ public IngestDocument execute(IngestDocument ingestDocument) throws Exception {
118120
*/
119121
@Override
120122
public void execute(IngestDocument ingestDocument, BiConsumer<IngestDocument, Exception> handler) {
123+
EventStatsManager.increment(EventStatName.SEMANTIC_FIELD_PROCESSOR_EXECUTIONS);
121124
try {
122125
unflattenIngestDoc(ingestDocument);
123126
// Collect all the semantic field info based on the path of semantic fields found in the index mapping
@@ -173,7 +176,10 @@ private void process(
173176
) {
174177
setModelInfo(ingestDocument, semanticFieldInfoList);
175178

176-
chunk(ingestDocument, semanticFieldInfoList);
179+
boolean isChunked = chunk(ingestDocument, semanticFieldInfoList);
180+
if (isChunked) {
181+
EventStatsManager.increment(EventStatName.SEMANTIC_FIELD_PROCESSOR_CHUNKING_EXECUTIONS);
182+
}
177183

178184
generateAndSetEmbedding(ingestDocument, semanticFieldInfoList, handler);
179185
}
@@ -277,12 +283,13 @@ private List<SemanticFieldInfo> getSemanticFieldInfo(IngestDocument ingestDocume
277283
return semanticFieldInfos;
278284
}
279285

280-
private void chunk(@NonNull final IngestDocument ingestDocument, @NonNull final List<SemanticFieldInfo> semanticFieldInfoList) {
286+
private boolean chunk(@NonNull final IngestDocument ingestDocument, @NonNull final List<SemanticFieldInfo> semanticFieldInfoList) {
281287
final Map<String, Object> sourceAndMetadataMap = ingestDocument.getSourceAndMetadata();
282288
int maxTokenCount = getMaxTokenCount(sourceAndMetadataMap, environment.settings(), clusterService);
283-
289+
boolean isChunked = false;
284290
for (SemanticFieldInfo semanticFieldInfo : semanticFieldInfoList) {
285291
if (semanticFieldInfo.getChunkingEnabled()) {
292+
isChunked = true;
286293
if (semanticFieldInfo.getChunkers() == null || semanticFieldInfo.getChunkers().isEmpty()) {
287294
semanticFieldInfo.setChunkers(List.of(defaultTextChunker));
288295
}
@@ -294,6 +301,7 @@ private void chunk(@NonNull final IngestDocument ingestDocument, @NonNull final
294301
semanticFieldInfo.setChunks(List.of(semanticFieldInfo.getValue()));
295302
}
296303
}
304+
return isChunked;
297305
}
298306

299307
private void setChunkedText(@NonNull final IngestDocument ingestDocument, @NonNull final SemanticFieldInfo semanticFieldInfo) {
@@ -386,6 +394,7 @@ private void collectSemanticFieldInfo(
386394

387395
@Override
388396
public void subBatchExecute(List<IngestDocumentWrapper> ingestDocumentWrappers, Consumer<List<IngestDocumentWrapper>> handler) {
397+
EventStatsManager.increment(EventStatName.SEMANTIC_FIELD_PROCESSOR_EXECUTIONS);
389398
if (ingestDocumentWrappers == null || ingestDocumentWrappers.isEmpty()) {
390399
handler.accept(ingestDocumentWrappers);
391400
return;
@@ -451,15 +460,17 @@ private void batchProcess(
451460
@NonNull final Map<IngestDocumentWrapper, List<SemanticFieldInfo>> docToSemanticFieldInfoMap,
452461
@NonNull final Consumer<List<IngestDocumentWrapper>> handler
453462
) {
454-
463+
boolean isChunked = false;
455464
for (Map.Entry<IngestDocumentWrapper, List<SemanticFieldInfo>> entry : docToSemanticFieldInfoMap.entrySet()) {
456465
final IngestDocumentWrapper ingestDocumentWrapper = entry.getKey();
457466
final IngestDocument ingestDocument = entry.getKey().getIngestDocument();
458467
final List<SemanticFieldInfo> semanticFieldInfoList = entry.getValue();
459468
try {
460469
setModelInfo(ingestDocument, semanticFieldInfoList);
461470

462-
chunk(ingestDocument, semanticFieldInfoList);
471+
if (chunk(ingestDocument, semanticFieldInfoList)) {
472+
isChunked = true;
473+
}
463474
} catch (Exception e) {
464475
log.error(
465476
String.format(
@@ -476,6 +487,10 @@ private void batchProcess(
476487
}
477488
}
478489

490+
if (isChunked) {
491+
EventStatsManager.increment(EventStatName.SEMANTIC_FIELD_PROCESSOR_CHUNKING_EXECUTIONS);
492+
}
493+
479494
batchGenerateAndSetEmbedding(ingestDocumentWrappers, docToSemanticFieldInfoMap, handler);
480495
}
481496

src/main/java/org/opensearch/neuralsearch/query/NeuralQueryBuilder.java

Lines changed: 6 additions & 11 deletions
Original file line numberDiff line numberDiff line change
@@ -17,7 +17,6 @@
1717
import static org.opensearch.neuralsearch.common.MinClusterVersionUtil.isClusterOnOrAfterMinReqVersionForSemanticFieldType;
1818
import static org.opensearch.neuralsearch.common.VectorUtil.vectorAsListToArray;
1919
import static org.opensearch.neuralsearch.constants.MappingConstants.PATH_SEPARATOR;
20-
import static org.opensearch.neuralsearch.constants.SemanticFieldConstants.DEFAULT_SEMANTIC_INFO_FIELD_NAME_SUFFIX;
2120
import static org.opensearch.neuralsearch.constants.SemanticInfoFieldConstants.EMBEDDING_FIELD_NAME;
2221
import static org.opensearch.neuralsearch.constants.SemanticInfoFieldConstants.CHUNKS_FIELD_NAME;
2322
import static org.opensearch.neuralsearch.processor.TextImageEmbeddingProcessor.INPUT_IMAGE;
@@ -83,6 +82,8 @@
8382
import org.opensearch.neuralsearch.common.MinClusterVersionUtil;
8483
import org.opensearch.neuralsearch.mapper.SemanticFieldMapper;
8584
import org.opensearch.neuralsearch.query.dto.NeuralQueryBuildStage;
85+
import org.opensearch.neuralsearch.stats.events.EventStatName;
86+
import org.opensearch.neuralsearch.stats.events.EventStatsManager;
8687
import org.opensearch.neuralsearch.util.NeuralSearchClusterUtil;
8788

8889
import org.opensearch.neuralsearch.ml.MLCommonsClientAccessor;
@@ -551,6 +552,7 @@ protected void doXContent(XContentBuilder xContentBuilder, Params params) throws
551552
* @throws IOException can be thrown by parser
552553
*/
553554
public static NeuralQueryBuilder fromXContent(XContentParser parser) throws IOException {
555+
EventStatsManager.increment(EventStatName.NEURAL_QUERY_REQUESTS);
554556
final Builder builder = new Builder();
555557
if (parser.currentToken() != XContentParser.Token.START_OBJECT) {
556558
throw new ParsingException(parser.getTokenLocation(), "Token must be START_OBJECT");
@@ -708,6 +710,7 @@ private QueryBuilder rewriteQueryForSemanticField(@NonNull final NeuralQueryTarg
708710
final String chunksPath = targetFieldConfig.getChunksPath();
709711

710712
if (KNNVectorFieldMapper.CONTENT_TYPE.equals(embeddingFieldType)) {
713+
EventStatsManager.increment(EventStatName.NEURAL_QUERY_AGAINST_SEMANTIC_DENSE_REQUESTS);
711714
if (modelIdToVectorSupplierMap == null
712715
|| modelIdToVectorSupplierMap.get(searchModelId) == null
713716
|| modelIdToVectorSupplierMap.get(searchModelId).get() == null) {
@@ -723,6 +726,7 @@ private QueryBuilder rewriteQueryForSemanticField(@NonNull final NeuralQueryTarg
723726
return neuralKNNQueryBuilder;
724727
}
725728
} else if (RankFeaturesFieldMapper.CONTENT_TYPE.equals(embeddingFieldType)) {
729+
EventStatsManager.increment(EventStatName.NEURAL_QUERY_AGAINST_SEMANTIC_SPARSE_REQUESTS);
726730
Supplier<Map<String, Float>> queryTokensSupplier = queryTokensMapSupplier;
727731
// If the raw token is not provided or no search analyzer provided
728732
// then try to find the token generated by the ml model
@@ -798,7 +802,7 @@ private QueryBuilder rewriteQueryAgainstKnnField(QueryRewriteContext queryRewrit
798802
if (vectorSupplier().get() == null) {
799803
return this;
800804
}
801-
805+
EventStatsManager.increment(EventStatName.NEURAL_QUERY_AGAINST_KNN_REQUESTS);
802806
return createKNNQueryBuilder(fieldName(), vectorSupplier.get());
803807
}
804808

@@ -977,15 +981,6 @@ private String getErrorMessageWithBaseErrorForSemantic(@NonNull final String err
977981
return "Failed to rewrite the neural query against the semantic field " + fieldName + ". " + errorMessage;
978982
}
979983

980-
private String getSemanticInfoFieldPath(SemanticFieldMapper.SemanticFieldType semanticFieldType) {
981-
final String[] paths = semanticFieldType.name().split("\\.");
982-
final String semanticInfoFieldName = semanticFieldType.getSemanticParameters().getSemanticInfoFieldName();
983-
paths[paths.length - 1] = semanticInfoFieldName == null
984-
? paths[paths.length - 1] + DEFAULT_SEMANTIC_INFO_FIELD_NAME_SUFFIX
985-
: semanticInfoFieldName;
986-
return String.join(PATH_SEPARATOR, paths) + PATH_SEPARATOR + CHUNKS_FIELD_NAME;
987-
}
988-
989984
private QueryBuilder inferenceForSemanticField(
990985
@NonNull final QueryRewriteContext queryRewriteContext,
991986
@NonNull final Set<String> modelIdsFromTargetFields,

0 commit comments

Comments
 (0)