Skip to content

Commit 41ba00a

Browse files
authored
Add limit on number of processors in Ingest pipelines (opensearch-project#15465)
* Add limit on number of processors in Ingest pipelines Signed-off-by: Rai <[email protected]>
1 parent cfcfe21 commit 41ba00a

File tree

7 files changed

+221
-7
lines changed

7 files changed

+221
-7
lines changed

CHANGELOG.md

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -31,6 +31,7 @@ The format is based on [Keep a Changelog](https://keepachangelog.com/en/1.0.0/),
3131
- Add support for centralize snapshot creation with pinned timestamp ([#15124](https://github.com/opensearch-project/OpenSearch/pull/15124))
3232
- Add concurrent search support for Derived Fields ([#15326](https://github.com/opensearch-project/OpenSearch/pull/15326))
3333
- [Workload Management] Add query group stats constructs ([#15343](https://github.com/opensearch-project/OpenSearch/pull/15343)))
34+
- Add limit on number of processors for Ingest pipeline([#15460](https://github.com/opensearch-project/OpenSearch/pull/15465)).
3435
- Add runAs to Subject interface and introduce IdentityAwarePlugin extension point ([#14630](https://github.com/opensearch-project/OpenSearch/pull/14630))
3536
- Optimize NodeIndicesStats output behind flag ([#14454](https://github.com/opensearch-project/OpenSearch/pull/14454))
3637
- [Workload Management] Add rejection logic for co-ordinator and shard level requests ([#15428](https://github.com/opensearch-project/OpenSearch/pull/15428)))

server/src/main/java/org/opensearch/action/ingest/SimulateExecutionService.java

Lines changed: 7 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -36,6 +36,7 @@
3636
import org.opensearch.core.action.ActionListener;
3737
import org.opensearch.ingest.CompoundProcessor;
3838
import org.opensearch.ingest.IngestDocument;
39+
import org.opensearch.ingest.IngestService;
3940
import org.opensearch.ingest.Pipeline;
4041
import org.opensearch.threadpool.ThreadPool;
4142

@@ -56,9 +57,11 @@ class SimulateExecutionService {
5657
private static final String THREAD_POOL_NAME = ThreadPool.Names.MANAGEMENT;
5758

5859
private final ThreadPool threadPool;
60+
private final IngestService ingestService;
5961

60-
SimulateExecutionService(ThreadPool threadPool) {
62+
SimulateExecutionService(ThreadPool threadPool, IngestService ingestService) {
6163
this.threadPool = threadPool;
64+
this.ingestService = ingestService;
6265
}
6366

6467
void executeDocument(
@@ -91,6 +94,9 @@ void executeDocument(
9194
}
9295

9396
public void execute(SimulatePipelineRequest.Parsed request, ActionListener<SimulatePipelineResponse> listener) {
97+
98+
ingestService.validateProcessorCountForIngestPipeline(request.getPipeline());
99+
94100
threadPool.executor(THREAD_POOL_NAME).execute(ActionRunnable.wrap(listener, l -> {
95101
final AtomicInteger counter = new AtomicInteger();
96102
final List<SimulateDocumentResult> responses = new CopyOnWriteArrayList<>(

server/src/main/java/org/opensearch/action/ingest/SimulatePipelineTransportAction.java

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -69,7 +69,7 @@ public SimulatePipelineTransportAction(
6969
(Writeable.Reader<SimulatePipelineRequest>) SimulatePipelineRequest::new
7070
);
7171
this.ingestService = ingestService;
72-
this.executionService = new SimulateExecutionService(threadPool);
72+
this.executionService = new SimulateExecutionService(threadPool, ingestService);
7373
}
7474

7575
@Override

server/src/main/java/org/opensearch/common/settings/ClusterSettings.java

Lines changed: 2 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -132,6 +132,7 @@
132132
import org.opensearch.indices.fielddata.cache.IndicesFieldDataCache;
133133
import org.opensearch.indices.recovery.RecoverySettings;
134134
import org.opensearch.indices.store.IndicesStore;
135+
import org.opensearch.ingest.IngestService;
135136
import org.opensearch.monitor.fs.FsHealthService;
136137
import org.opensearch.monitor.fs.FsService;
137138
import org.opensearch.monitor.jvm.JvmGcMonitorService;
@@ -406,6 +407,7 @@ public void apply(Settings value, Settings current, Settings previous) {
406407
ClusterService.USER_DEFINED_METADATA,
407408
ClusterManagerService.MASTER_SERVICE_SLOW_TASK_LOGGING_THRESHOLD_SETTING, // deprecated
408409
ClusterManagerService.CLUSTER_MANAGER_SERVICE_SLOW_TASK_LOGGING_THRESHOLD_SETTING,
410+
IngestService.MAX_NUMBER_OF_INGEST_PROCESSORS,
409411
SearchService.DEFAULT_SEARCH_TIMEOUT_SETTING,
410412
SearchService.DEFAULT_ALLOW_PARTIAL_SEARCH_RESULTS,
411413
TransportSearchAction.SHARD_COUNT_LIMIT_SETTING,

server/src/main/java/org/opensearch/ingest/IngestService.java

Lines changed: 37 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -62,6 +62,7 @@
6262
import org.opensearch.common.collect.Tuple;
6363
import org.opensearch.common.metrics.OperationMetrics;
6464
import org.opensearch.common.regex.Regex;
65+
import org.opensearch.common.settings.Setting;
6566
import org.opensearch.common.settings.Settings;
6667
import org.opensearch.common.unit.TimeValue;
6768
import org.opensearch.common.util.concurrent.AbstractRunnable;
@@ -107,6 +108,18 @@ public class IngestService implements ClusterStateApplier, ReportingService<Inge
107108

108109
public static final String INGEST_ORIGIN = "ingest";
109110

111+
/**
112+
* Defines the limit for the number of processors which can run on a given document during ingestion.
113+
*/
114+
public static final Setting<Integer> MAX_NUMBER_OF_INGEST_PROCESSORS = Setting.intSetting(
115+
"cluster.ingest.max_number_processors",
116+
Integer.MAX_VALUE,
117+
1,
118+
Integer.MAX_VALUE,
119+
Setting.Property.NodeScope,
120+
Setting.Property.Dynamic
121+
);
122+
110123
private static final Logger logger = LogManager.getLogger(IngestService.class);
111124

112125
private final ClusterService clusterService;
@@ -123,6 +136,7 @@ public class IngestService implements ClusterStateApplier, ReportingService<Inge
123136
private final ClusterManagerTaskThrottler.ThrottlingKey putPipelineTaskKey;
124137
private final ClusterManagerTaskThrottler.ThrottlingKey deletePipelineTaskKey;
125138
private volatile ClusterState state;
139+
private volatile int maxIngestProcessorCount;
126140

127141
public IngestService(
128142
ClusterService clusterService,
@@ -156,6 +170,12 @@ public IngestService(
156170
// Task is onboarded for throttling, it will get retried from associated TransportClusterManagerNodeAction.
157171
putPipelineTaskKey = clusterService.registerClusterManagerTask(ClusterManagerTaskKeys.PUT_PIPELINE_KEY, true);
158172
deletePipelineTaskKey = clusterService.registerClusterManagerTask(ClusterManagerTaskKeys.DELETE_PIPELINE_KEY, true);
173+
clusterService.getClusterSettings().addSettingsUpdateConsumer(MAX_NUMBER_OF_INGEST_PROCESSORS, this::setMaxIngestProcessorCount);
174+
setMaxIngestProcessorCount(clusterService.getClusterSettings().get(MAX_NUMBER_OF_INGEST_PROCESSORS));
175+
}
176+
177+
private void setMaxIngestProcessorCount(Integer maxIngestProcessorCount) {
178+
this.maxIngestProcessorCount = maxIngestProcessorCount;
159179
}
160180

161181
private static Map<String, Processor.Factory> processorFactories(List<IngestPlugin> ingestPlugins, Processor.Parameters parameters) {
@@ -494,6 +514,9 @@ void validatePipeline(Map<DiscoveryNode, IngestInfo> ingestInfos, PutPipelineReq
494514

495515
Map<String, Object> pipelineConfig = XContentHelper.convertToMap(request.getSource(), false, request.getMediaType()).v2();
496516
Pipeline pipeline = Pipeline.create(request.getId(), pipelineConfig, processorFactories, scriptService);
517+
518+
validateProcessorCountForIngestPipeline(pipeline);
519+
497520
List<Exception> exceptions = new ArrayList<>();
498521
for (Processor processor : pipeline.flattenAllProcessors()) {
499522
for (Map.Entry<DiscoveryNode, IngestInfo> entry : ingestInfos.entrySet()) {
@@ -507,6 +530,20 @@ void validatePipeline(Map<DiscoveryNode, IngestInfo> ingestInfos, PutPipelineReq
507530
ExceptionsHelper.rethrowAndSuppress(exceptions);
508531
}
509532

533+
public void validateProcessorCountForIngestPipeline(Pipeline pipeline) {
534+
List<Processor> processors = pipeline.flattenAllProcessors();
535+
536+
if (processors.size() > maxIngestProcessorCount) {
537+
throw new IllegalStateException(
538+
"Cannot use more than the maximum processors allowed. Number of processors being configured is ["
539+
+ processors.size()
540+
+ "] which exceeds the maximum allowed configuration of ["
541+
+ maxIngestProcessorCount
542+
+ "] processors."
543+
);
544+
}
545+
}
546+
510547
public void executeBulkRequest(
511548
int numberOfActionRequests,
512549
Iterable<DocWriteRequest<?>> actionRequests,

server/src/test/java/org/opensearch/action/ingest/SimulateExecutionServiceTests.java

Lines changed: 22 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -39,6 +39,7 @@
3939
import org.opensearch.ingest.DropProcessor;
4040
import org.opensearch.ingest.IngestDocument;
4141
import org.opensearch.ingest.IngestProcessorException;
42+
import org.opensearch.ingest.IngestService;
4243
import org.opensearch.ingest.Pipeline;
4344
import org.opensearch.ingest.Processor;
4445
import org.opensearch.ingest.RandomDocumentPicks;
@@ -67,6 +68,8 @@
6768
import static org.hamcrest.Matchers.notNullValue;
6869
import static org.hamcrest.Matchers.nullValue;
6970
import static org.hamcrest.Matchers.sameInstance;
71+
import static org.mockito.Mockito.doThrow;
72+
import static org.mockito.Mockito.mock;
7073

7174
public class SimulateExecutionServiceTests extends OpenSearchTestCase {
7275

@@ -75,11 +78,13 @@ public class SimulateExecutionServiceTests extends OpenSearchTestCase {
7578
private TestThreadPool threadPool;
7679
private SimulateExecutionService executionService;
7780
private IngestDocument ingestDocument;
81+
private IngestService ingestService;
7882

7983
@Before
8084
public void setup() {
85+
ingestService = mock(IngestService.class);
8186
threadPool = new TestThreadPool(SimulateExecutionServiceTests.class.getSimpleName());
82-
executionService = new SimulateExecutionService(threadPool);
87+
executionService = new SimulateExecutionService(threadPool, ingestService);
8388
ingestDocument = RandomDocumentPicks.randomIngestDocument(random());
8489
}
8590

@@ -400,6 +405,22 @@ public String getType() {
400405
}
401406
}
402407

408+
public void testValidateProcessorCountForIngestPipelineThrowsException() {
409+
410+
int numDocs = randomIntBetween(1, 64);
411+
List<IngestDocument> documents = new ArrayList<>(numDocs);
412+
for (int id = 0; id < numDocs; id++) {
413+
documents.add(new IngestDocument("_index", Integer.toString(id), null, 0L, VersionType.INTERNAL, new HashMap<>()));
414+
}
415+
416+
Pipeline pipeline = new Pipeline("_id", "_description", version, new CompoundProcessor());
417+
SimulatePipelineRequest.Parsed request = new SimulatePipelineRequest.Parsed(pipeline, documents, false);
418+
419+
doThrow(new IllegalStateException()).when(ingestService).validateProcessorCountForIngestPipeline(pipeline);
420+
421+
expectThrows(IllegalStateException.class, () -> executionService.execute(request, ActionListener.wrap(response -> {}, e -> {})));
422+
}
423+
403424
private static void assertVerboseResult(
404425
SimulateProcessorResult result,
405426
String expectedPipelineId,

0 commit comments

Comments
 (0)