Skip to content

Commit fe4a98d

Browse files
authored
Validate Search Phase Request Processors in Search Pipeline (opensearch-project#18363)
--------- Signed-off-by: Owais <[email protected]>
1 parent 22a6194 commit fe4a98d

File tree

2 files changed

+31
-13
lines changed

2 files changed

+31
-13
lines changed

server/src/main/java/org/opensearch/search/pipeline/SearchPipelineService.java

Lines changed: 18 additions & 12 deletions
Original file line numberDiff line numberDiff line change
@@ -311,25 +311,27 @@ void validatePipeline(Map<DiscoveryNode, SearchPipelineInfo> searchPipelineInfos
311311
new Processor.PipelineContext(Processor.PipelineSource.VALIDATE_PIPELINE)
312312
);
313313
List<Exception> exceptions = new ArrayList<>();
314-
for (SearchRequestProcessor processor : pipeline.getSearchRequestProcessors()) {
315-
for (Map.Entry<DiscoveryNode, SearchPipelineInfo> entry : searchPipelineInfos.entrySet()) {
316-
String type = processor.getType();
317-
if (entry.getValue().containsProcessor(Pipeline.REQUEST_PROCESSORS_KEY, type) == false) {
318-
String message = "Processor type [" + processor.getType() + "] is not installed on node [" + entry.getKey() + "]";
319-
exceptions.add(ConfigurationUtils.newConfigurationException(processor.getType(), processor.getTag(), null, message));
320-
}
321-
}
322-
}
323-
for (SearchResponseProcessor processor : pipeline.getSearchResponseProcessors()) {
314+
validateProcessors(searchPipelineInfos, exceptions, Pipeline.REQUEST_PROCESSORS_KEY, pipeline.getSearchRequestProcessors());
315+
validateProcessors(searchPipelineInfos, exceptions, Pipeline.RESPONSE_PROCESSORS_KEY, pipeline.getSearchResponseProcessors());
316+
validateProcessors(searchPipelineInfos, exceptions, Pipeline.PHASE_PROCESSORS_KEY, pipeline.getSearchPhaseResultsProcessors());
317+
ExceptionsHelper.rethrowAndSuppress(exceptions);
318+
}
319+
320+
private void validateProcessors(
321+
Map<DiscoveryNode, SearchPipelineInfo> searchPipelineInfos,
322+
List<Exception> exceptions,
323+
String processorKey,
324+
List<? extends Processor> processors
325+
) {
326+
for (Processor processor : processors) {
324327
for (Map.Entry<DiscoveryNode, SearchPipelineInfo> entry : searchPipelineInfos.entrySet()) {
325328
String type = processor.getType();
326-
if (entry.getValue().containsProcessor(Pipeline.RESPONSE_PROCESSORS_KEY, type) == false) {
329+
if (entry.getValue().containsProcessor(processorKey, type) == false) {
327330
String message = "Processor type [" + processor.getType() + "] is not installed on node [" + entry.getKey() + "]";
328331
exceptions.add(ConfigurationUtils.newConfigurationException(processor.getType(), processor.getTag(), null, message));
329332
}
330333
}
331334
}
332-
ExceptionsHelper.rethrowAndSuppress(exceptions);
333335
}
334336

335337
public void deletePipeline(DeleteSearchPipelineRequest request, ActionListener<AcknowledgedResponse> listener) throws Exception {
@@ -460,6 +462,10 @@ Map<String, Processor.Factory<SearchResponseProcessor>> getResponseProcessorFact
460462
return responseProcessorFactories;
461463
}
462464

465+
Map<String, Processor.Factory<SearchPhaseResultsProcessor>> getSearchPhaseResultsProcessorFactories() {
466+
return phaseInjectorProcessorFactories;
467+
}
468+
463469
@Override
464470
public SearchPipelineInfo info() {
465471
List<ProcessorInfo> requestProcessorInfoList = requestProcessorFactories.keySet()

server/src/test/java/org/opensearch/search/pipeline/SearchPipelineServiceTests.java

Lines changed: 13 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -146,6 +146,10 @@ public void testSearchPipelinePlugin() {
146146
.getResponseProcessorFactories();
147147
assertEquals(1, responseProcessorFactories.size());
148148
assertTrue(responseProcessorFactories.containsKey("bar"));
149+
Map<String, Processor.Factory<SearchPhaseResultsProcessor>> phaseInjectorProcessorFactories = searchPipelineService
150+
.getSearchPhaseResultsProcessorFactories();
151+
assertEquals(1, phaseInjectorProcessorFactories.size());
152+
assertTrue(phaseInjectorProcessorFactories.containsKey("zoe"));
149153
}
150154

151155
public void testSearchPipelinePluginDuplicate() {
@@ -879,6 +883,7 @@ public void testValidatePipeline() throws Exception {
879883

880884
ProcessorInfo reqProcessor = new ProcessorInfo("scale_request_size");
881885
ProcessorInfo rspProcessor = new ProcessorInfo("fixed_score");
886+
ProcessorInfo searchPhaseProcessor = new ProcessorInfo("max_score");
882887
DiscoveryNode n1 = new DiscoveryNode("n1", buildNewFakeTransportAddress(), Version.CURRENT);
883888
DiscoveryNode n2 = new DiscoveryNode("n2", buildNewFakeTransportAddress(), Version.CURRENT);
884889
PutSearchPipelineRequest putRequest = new PutSearchPipelineRequest(
@@ -901,7 +906,14 @@ public void testValidatePipeline() throws Exception {
901906
);
902907

903908
SearchPipelineInfo completePipelineInfo = new SearchPipelineInfo(
904-
Map.of(Pipeline.REQUEST_PROCESSORS_KEY, List.of(reqProcessor), Pipeline.RESPONSE_PROCESSORS_KEY, List.of(rspProcessor))
909+
Map.of(
910+
Pipeline.REQUEST_PROCESSORS_KEY,
911+
List.of(reqProcessor),
912+
Pipeline.RESPONSE_PROCESSORS_KEY,
913+
List.of(rspProcessor),
914+
Pipeline.PHASE_PROCESSORS_KEY,
915+
List.of(searchPhaseProcessor)
916+
)
905917
);
906918
SearchPipelineInfo incompletePipelineInfo = new SearchPipelineInfo(Map.of(Pipeline.REQUEST_PROCESSORS_KEY, List.of(reqProcessor)));
907919
// One node is missing a processor

0 commit comments

Comments
 (0)