Skip to content

Introduce system ingest pipeline. #17817

New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
1 change: 1 addition & 0 deletions CHANGELOG.md
Original file line number Diff line number Diff line change
Expand Up @@ -18,6 +18,7 @@ The format is based on [Keep a Changelog](https://keepachangelog.com/en/1.0.0/),
- Allow to get the search request from the QueryCoordinatorContext ([#17818](https://github.com/opensearch-project/OpenSearch/pull/17818))
- Improve sort-query performance by retaining the default `totalHitsThreshold` for approximated `match_all` queries ([#18189](https://github.com/opensearch-project/OpenSearch/pull/18189))
- Enable testing for ExtensiblePlugins using classpath plugins ([#16908](https://github.com/opensearch-project/OpenSearch/pull/16908))
- Introduce system generated ingest pipeline ([#17817](https://github.com/opensearch-project/OpenSearch/pull/17817)))

### Changed

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -111,7 +111,7 @@ public void testFinalPipelineCantChangeDestination() {
IllegalStateException.class,
() -> client().prepareIndex("index").setId("1").setSource(Collections.singletonMap("field", "value")).get()
);
assertThat(e, hasToString(containsString("final pipeline [final_pipeline] can't change the target index")));
assertThat(e, hasToString(containsString("FINAL pipeline [final_pipeline] can't change the target index")));
}

public void testFinalPipelineOfOldDestinationIsNotInvoked() {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -251,7 +251,7 @@ protected void doInternalExecute(Task task, BulkRequest bulkRequest, String exec
IndexRequest indexRequest = getIndexWriteRequest(actionRequest);
if (indexRequest != null) {
// Each index request needs to be evaluated, because this method also modifies the IndexRequest
boolean indexRequestHasPipeline = IngestService.resolvePipelines(actionRequest, indexRequest, metadata);
boolean indexRequestHasPipeline = ingestService.resolvePipelines(actionRequest, indexRequest, metadata);
hasIndexRequestsWithPipelines |= indexRequestHasPipeline;
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -123,6 +123,7 @@ public class IndexRequest extends ReplicatedWriteRequest<IndexRequest> implement

private String pipeline;
private String finalPipeline;
private String systemIngestPipeline;

private boolean isPipelineResolved;

Expand Down Expand Up @@ -158,6 +159,9 @@ public IndexRequest(@Nullable ShardId shardId, StreamInput in) throws IOExceptio
versionType = VersionType.fromValue(in.readByte());
pipeline = in.readOptionalString();
finalPipeline = in.readOptionalString();
if (in.getVersion().onOrAfter(Version.V_3_1_0)) {
systemIngestPipeline = in.readOptionalString();
}
isPipelineResolved = in.readBoolean();
isRetry = in.readBoolean();
autoGeneratedTimestamp = in.readLong();
Expand Down Expand Up @@ -314,6 +318,21 @@ public String getPipeline() {
return this.pipeline;
}

/**
* Sets the system ingest pipeline to be executed before indexing the document
*/
public IndexRequest setSystemIngestPipeline(final String systemIngestPipeline) {
this.systemIngestPipeline = systemIngestPipeline;
return this;
}

/**
* Returns the system ingest pipeline to be executed before indexing the document
*/
public String getSystemIngestPipeline() {
return this.systemIngestPipeline;
}

/**
* Sets the final ingest pipeline to be executed before indexing the document.
*
Expand Down Expand Up @@ -668,6 +687,9 @@ private void writeBody(StreamOutput out) throws IOException {
out.writeByte(versionType.getValue());
out.writeOptionalString(pipeline);
out.writeOptionalString(finalPipeline);
if (out.getVersion().onOrAfter(Version.V_3_1_0)) {
out.writeOptionalString(systemIngestPipeline);
}
out.writeBoolean(isPipelineResolved);
out.writeBoolean(isRetry);
out.writeLong(autoGeneratedTimestamp);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -413,6 +413,7 @@ public void apply(Settings value, Settings current, Settings previous) {
ClusterService.USER_DEFINED_METADATA,
ClusterManagerService.CLUSTER_MANAGER_SERVICE_SLOW_TASK_LOGGING_THRESHOLD_SETTING,
IngestService.MAX_NUMBER_OF_INGEST_PROCESSORS,
IngestService.SYSTEM_INGEST_PIPELINE_ENABLED,
SearchService.DEFAULT_SEARCH_TIMEOUT_SETTING,
SearchService.DEFAULT_ALLOW_PARTIAL_SEARCH_RESULTS,
TransportSearchAction.SHARD_COUNT_LIMIT_SETTING,
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,81 @@
/*
* SPDX-License-Identifier: Apache-2.0
*
* The OpenSearch Contributors require contributions made to
* this file be licensed under the Apache-2.0 license or a
* compatible open source license.
*/

package org.opensearch.ingest;

import java.util.Map;

/**
* Abstract base class for batch system generated processors.
*
* System processors should not be used in the regular ingest pipelines.
*
* @opensearch.internal
*/
public abstract class AbstractBatchingSystemProcessor extends AbstractBatchingProcessor {
protected AbstractBatchingSystemProcessor(String tag, String description, int batchSize) {
super(tag, description, batchSize);
}

@Override
public boolean isSystemGenerated() {
return true;

Check warning on line 27 in server/src/main/java/org/opensearch/ingest/AbstractBatchingSystemProcessor.java

View check run for this annotation

Codecov / codecov/patch

server/src/main/java/org/opensearch/ingest/AbstractBatchingSystemProcessor.java#L27

Added line #L27 was not covered by tests
}

/**
* Factory class for creating {@link AbstractBatchingSystemProcessor} instances systematically.
*
* Since the processor config is generated based on the index config so the batch size info should also be defined
* as part of it. And different processors can have their own logic to decide the batch size so let each
* implementation of the newProcessor to handle it.
*
* @opensearch.internal
*/
public abstract static class Factory implements Processor.Factory {
final String processorType;

protected Factory(String processorType) {
this.processorType = processorType;
}

@Override
public boolean isSystemGenerated() {
return true;
}

/**
* Creates a new processor instance. It will be invoked systematically.
*
* @param processorFactories The processor factories.
* @param tag The processor tag.
* @param description The processor description.
* @param config The processor configuration.
* @return The new AbstractBatchProcessor instance.
* @throws Exception If the processor could not be created.
*/
@Override
public AbstractBatchingSystemProcessor create(
Map<String, Processor.Factory> processorFactories,
String tag,
String description,
Map<String, Object> config
) throws Exception {
return newProcessor(tag, description, config);
}

/**
* Returns a new processor instance. It will be invoked systematically.
*
* @param tag tag of the processor
* @param description description of the processor
* @param config configuration of the processor
* @return a new batch processor instance
*/
protected abstract AbstractBatchingSystemProcessor newProcessor(String tag, String description, Map<String, Object> config);
}
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,56 @@
/*
* SPDX-License-Identifier: Apache-2.0
*
* The OpenSearch Contributors require contributions made to
* this file be licensed under the Apache-2.0 license or a
* compatible open source license.
*/

package org.opensearch.ingest;

import org.opensearch.action.DocWriteRequest;
import org.opensearch.action.index.IndexRequest;

import java.util.List;

/**
* A wrapper for the index request to help execute the ingest pipelines.
*/
public class IndexRequestWrapper {
/**
* slot of the IndexRequestWrapper is the index of the request in the list of the requests.
* It can be used to map the ingested result or exception to right index request.
*/
private final int slot;
private final IndexRequest indexRequest;
private final DocWriteRequest<?> actionRequest;
private final List<IngestPipelineInfo> pipelineInfoList;

public IndexRequestWrapper(
int slot,
IndexRequest indexRequest,
DocWriteRequest<?> actionRequest,
List<IngestPipelineInfo> pipelineInfoList
) {
this.slot = slot;
this.indexRequest = indexRequest;
this.actionRequest = actionRequest;
this.pipelineInfoList = pipelineInfoList;
}

public int getSlot() {
return slot;
}

public IndexRequest getIndexRequest() {
return indexRequest;
}

public DocWriteRequest<?> getActionRequest() {
return actionRequest;
}

public List<IngestPipelineInfo> getIngestPipelineInfoList() {
return pipelineInfoList;
}
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,37 @@
/*
* SPDX-License-Identifier: Apache-2.0
*
* The OpenSearch Contributors require contributions made to
* this file be licensed under the Apache-2.0 license or a
* compatible open source license.
*/

package org.opensearch.ingest;

import reactor.util.annotation.NonNull;

/**
* Ingest pipeline info help hold the pipeline id and type.
*/
public class IngestPipelineInfo {
private final String pipelineId;
private final IngestPipelineType type;

public IngestPipelineInfo(final @NonNull String pipelineId, final @NonNull IngestPipelineType type) {
this.pipelineId = pipelineId;
this.type = type;
}

public String getPipelineId() {
return pipelineId;
}

public IngestPipelineType getType() {
return type;
}

@Override
public String toString() {
return pipelineId + ":" + type.name();
}
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,29 @@
/*
* SPDX-License-Identifier: Apache-2.0
*
* The OpenSearch Contributors require contributions made to
* this file be licensed under the Apache-2.0 license or a
* compatible open source license.
*/

package org.opensearch.ingest;

/**
* An enum for the ingest pipeline type
*/
public enum IngestPipelineType {
/**
* Default pipeline is the pipeline provided through the index request or defined in
* the index settings as the default pipeline.
*/
DEFAULT,
/**
* Final pipeline is the one defined in the index settings as the final pipeline.
*/
FINAL,
/**
* System final pipeline is a systematically generated pipeline which will be executed after the
* user defined final pipeline.
*/
SYSTEM_FINAL
}
Loading
Loading