Skip to content

Commit 10bdfee

Browse files
authored
Add SortResponseProcessor to Search Pipelines (#14785) (#14868)
* Add SortResponseProcessor for search pipelines * Add stupid and unnecessary javadocs to satisfy overly strict CI * Split casting and sorting methods for readability * Register the sort processor factory * Address code review comments * Cast individual list elements to avoid creating two lists * Add yamlRestTests * Clarify why there's unusual sorting * Use instanceof instead of isAssignableFrom --------- Signed-off-by: Daniel Widdis <[email protected]>
1 parent 61ca56e commit 10bdfee

File tree

7 files changed

+597
-4
lines changed

7 files changed

+597
-4
lines changed

CHANGELOG.md

+2
Original file line numberDiff line numberDiff line change
@@ -19,6 +19,8 @@ The format is based on [Keep a Changelog](https://keepachangelog.com/en/1.0.0/),
1919
- Print reason why parent task was cancelled ([#14604](https://github.com/opensearch-project/OpenSearch/issues/14604))
2020
- Add matchesPluginSystemIndexPattern to SystemIndexRegistry ([#14750](https://github.com/opensearch-project/OpenSearch/pull/14750))
2121
- Add Plugin interface for loading application based configuration templates (([#14659](https://github.com/opensearch-project/OpenSearch/issues/14659)))
22+
- Refactor remote-routing-table service inline with remote state interfaces([#14668](https://github.com/opensearch-project/OpenSearch/pull/14668))
23+
- Add SortResponseProcessor to Search Pipelines (([#14785](https://github.com/opensearch-project/OpenSearch/issues/14785)))
2224
- Add prefix mode verification setting for repository verification (([#14790](https://github.com/opensearch-project/OpenSearch/pull/14790)))
2325
- Add SplitResponseProcessor to Search Pipelines (([#14800](https://github.com/opensearch-project/OpenSearch/issues/14800)))
2426
- Optimize TransportNodesAction to not send DiscoveryNodes for NodeStats, NodesInfo and ClusterStats call ([14749](https://github.com/opensearch-project/OpenSearch/pull/14749))

modules/search-pipeline-common/src/main/java/org/opensearch/search/pipeline/common/SearchPipelineCommonModulePlugin.java

+2-2
Original file line numberDiff line numberDiff line change
@@ -97,8 +97,8 @@ public Map<String, Processor.Factory<SearchResponseProcessor>> getResponseProces
9797
new TruncateHitsResponseProcessor.Factory(),
9898
CollapseResponseProcessor.TYPE,
9999
new CollapseResponseProcessor.Factory(),
100-
SplitResponseProcessor.TYPE,
101-
new SplitResponseProcessor.Factory()
100+
SortResponseProcessor.TYPE,
101+
new SortResponseProcessor.Factory()
102102
)
103103
);
104104
}
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,209 @@
1+
/*
2+
* SPDX-License-Identifier: Apache-2.0
3+
*
4+
* The OpenSearch Contributors require contributions made to
5+
* this file be licensed under the Apache-2.0 license or a
6+
* compatible open source license.
7+
*/
8+
9+
package org.opensearch.search.pipeline.common;
10+
11+
import org.opensearch.action.search.SearchRequest;
12+
import org.opensearch.action.search.SearchResponse;
13+
import org.opensearch.common.collect.Tuple;
14+
import org.opensearch.common.document.DocumentField;
15+
import org.opensearch.common.xcontent.XContentHelper;
16+
import org.opensearch.core.common.bytes.BytesReference;
17+
import org.opensearch.core.xcontent.MediaType;
18+
import org.opensearch.core.xcontent.XContentBuilder;
19+
import org.opensearch.ingest.ConfigurationUtils;
20+
import org.opensearch.search.SearchHit;
21+
import org.opensearch.search.pipeline.AbstractProcessor;
22+
import org.opensearch.search.pipeline.Processor;
23+
import org.opensearch.search.pipeline.SearchResponseProcessor;
24+
25+
import java.util.Comparator;
26+
import java.util.List;
27+
import java.util.Map;
28+
import java.util.Objects;
29+
import java.util.stream.Collectors;
30+
31+
/**
32+
* Processor that sorts an array of items.
33+
* Throws exception is the specified field is not an array.
34+
*/
35+
public class SortResponseProcessor extends AbstractProcessor implements SearchResponseProcessor {
36+
/** Key to reference this processor type from a search pipeline. */
37+
public static final String TYPE = "sort";
38+
/** Key defining the array field to be sorted. */
39+
public static final String SORT_FIELD = "field";
40+
/** Optional key defining the sort order. */
41+
public static final String SORT_ORDER = "order";
42+
/** Optional key to put the sorted values in a different field. */
43+
public static final String TARGET_FIELD = "target_field";
44+
/** Default sort order if not specified */
45+
public static final String DEFAULT_ORDER = "asc";
46+
47+
/** Enum defining how elements will be sorted */
48+
public enum SortOrder {
49+
/** Sort in ascending (natural) order */
50+
ASCENDING("asc"),
51+
/** Sort in descending (reverse) order */
52+
DESCENDING("desc");
53+
54+
private final String direction;
55+
56+
SortOrder(String direction) {
57+
this.direction = direction;
58+
}
59+
60+
@Override
61+
public String toString() {
62+
return this.direction;
63+
}
64+
65+
/**
66+
* Converts the string representation of the enum value to the enum.
67+
* @param value A string ("asc" or "desc")
68+
* @return the corresponding enum value
69+
*/
70+
public static SortOrder fromString(String value) {
71+
if (value == null) {
72+
throw new IllegalArgumentException("Sort direction cannot be null");
73+
}
74+
75+
if (value.equals(ASCENDING.toString())) {
76+
return ASCENDING;
77+
} else if (value.equals(DESCENDING.toString())) {
78+
return DESCENDING;
79+
}
80+
throw new IllegalArgumentException("Sort direction [" + value + "] not recognized." + " Valid values are: [asc, desc]");
81+
}
82+
}
83+
84+
private final String sortField;
85+
private final SortOrder sortOrder;
86+
private final String targetField;
87+
88+
SortResponseProcessor(
89+
String tag,
90+
String description,
91+
boolean ignoreFailure,
92+
String sortField,
93+
SortOrder sortOrder,
94+
String targetField
95+
) {
96+
super(tag, description, ignoreFailure);
97+
this.sortField = Objects.requireNonNull(sortField);
98+
this.sortOrder = Objects.requireNonNull(sortOrder);
99+
this.targetField = targetField == null ? sortField : targetField;
100+
}
101+
102+
/**
103+
* Getter function for sortField
104+
* @return sortField
105+
*/
106+
public String getSortField() {
107+
return sortField;
108+
}
109+
110+
/**
111+
* Getter function for targetField
112+
* @return targetField
113+
*/
114+
public String getTargetField() {
115+
return targetField;
116+
}
117+
118+
/**
119+
* Getter function for sortOrder
120+
* @return sortOrder
121+
*/
122+
public SortOrder getSortOrder() {
123+
return sortOrder;
124+
}
125+
126+
@Override
127+
public String getType() {
128+
return TYPE;
129+
}
130+
131+
@Override
132+
public SearchResponse processResponse(SearchRequest request, SearchResponse response) throws Exception {
133+
SearchHit[] hits = response.getHits().getHits();
134+
for (SearchHit hit : hits) {
135+
Map<String, DocumentField> fields = hit.getFields();
136+
if (fields.containsKey(sortField)) {
137+
DocumentField docField = hit.getFields().get(sortField);
138+
if (docField == null) {
139+
throw new IllegalArgumentException("field [" + sortField + "] is null, cannot sort.");
140+
}
141+
hit.setDocumentField(targetField, new DocumentField(targetField, getSortedValues(docField.getValues())));
142+
}
143+
if (hit.hasSource()) {
144+
BytesReference sourceRef = hit.getSourceRef();
145+
Tuple<? extends MediaType, Map<String, Object>> typeAndSourceMap = XContentHelper.convertToMap(
146+
sourceRef,
147+
false,
148+
(MediaType) null
149+
);
150+
151+
Map<String, Object> sourceAsMap = typeAndSourceMap.v2();
152+
if (sourceAsMap.containsKey(sortField)) {
153+
Object val = sourceAsMap.get(sortField);
154+
if (val instanceof List) {
155+
@SuppressWarnings("unchecked")
156+
List<Object> listVal = (List<Object>) val;
157+
sourceAsMap.put(targetField, getSortedValues(listVal));
158+
}
159+
XContentBuilder builder = XContentBuilder.builder(typeAndSourceMap.v1().xContent());
160+
builder.map(sourceAsMap);
161+
hit.sourceRef(BytesReference.bytes(builder));
162+
}
163+
}
164+
}
165+
return response;
166+
}
167+
168+
private List<Object> getSortedValues(List<Object> values) {
169+
return values.stream()
170+
.map(this::downcastToComparable)
171+
.sorted(sortOrder.equals(SortOrder.ASCENDING) ? Comparator.naturalOrder() : Comparator.reverseOrder())
172+
.collect(Collectors.toList());
173+
}
174+
175+
@SuppressWarnings("unchecked")
176+
private Comparable<Object> downcastToComparable(Object obj) {
177+
if (obj instanceof Comparable) {
178+
return (Comparable<Object>) obj;
179+
} else if (obj == null) {
180+
throw new IllegalArgumentException("field [" + sortField + "] contains a null value.]");
181+
} else {
182+
throw new IllegalArgumentException("field [" + sortField + "] of type [" + obj.getClass().getName() + "] is not comparable.]");
183+
}
184+
}
185+
186+
static class Factory implements Processor.Factory<SearchResponseProcessor> {
187+
188+
@Override
189+
public SortResponseProcessor create(
190+
Map<String, Processor.Factory<SearchResponseProcessor>> processorFactories,
191+
String tag,
192+
String description,
193+
boolean ignoreFailure,
194+
Map<String, Object> config,
195+
PipelineContext pipelineContext
196+
) {
197+
String sortField = ConfigurationUtils.readStringProperty(TYPE, tag, config, SORT_FIELD);
198+
String targetField = ConfigurationUtils.readStringProperty(TYPE, tag, config, TARGET_FIELD, sortField);
199+
try {
200+
SortOrder sortOrder = SortOrder.fromString(
201+
ConfigurationUtils.readStringProperty(TYPE, tag, config, SORT_ORDER, DEFAULT_ORDER)
202+
);
203+
return new SortResponseProcessor(tag, description, ignoreFailure, sortField, sortOrder, targetField);
204+
} catch (IllegalArgumentException e) {
205+
throw ConfigurationUtils.newConfigurationException(TYPE, tag, SORT_ORDER, e.getMessage());
206+
}
207+
}
208+
}
209+
}

modules/search-pipeline-common/src/main/java/org/opensearch/search/pipeline/common/SplitResponseProcessor.java

+1-1
Original file line numberDiff line numberDiff line change
@@ -111,7 +111,7 @@ public SearchResponse processResponse(SearchRequest request, SearchResponse resp
111111
throw new IllegalArgumentException("field [" + splitField + "] is null, cannot split.");
112112
}
113113
Object val = docField.getValue();
114-
if (val == null || !String.class.isAssignableFrom(val.getClass())) {
114+
if (!(val instanceof String)) {
115115
throw new IllegalArgumentException("field [" + splitField + "] is not a string, cannot split");
116116
}
117117
Object[] strings = ((String) val).split(separator, preserveTrailing ? -1 : 0);

modules/search-pipeline-common/src/test/java/org/opensearch/search/pipeline/common/SearchPipelineCommonModulePluginTests.java

+1-1
Original file line numberDiff line numberDiff line change
@@ -82,7 +82,7 @@ public void testAllowlistNotSpecified() throws IOException {
8282
try (SearchPipelineCommonModulePlugin plugin = new SearchPipelineCommonModulePlugin()) {
8383
assertEquals(Set.of("oversample", "filter_query", "script"), plugin.getRequestProcessors(createParameters(settings)).keySet());
8484
assertEquals(
85-
Set.of("rename_field", "truncate_hits", "collapse", "split"),
85+
Set.of("rename_field", "truncate_hits", "collapse", "sort"),
8686
plugin.getResponseProcessors(createParameters(settings)).keySet()
8787
);
8888
assertEquals(Set.of(), plugin.getSearchPhaseResultsProcessors(createParameters(settings)).keySet());

0 commit comments

Comments
 (0)