Skip to content

Commit dde6aab

Browse files
jowg-amazongoyamegheirsep
authored
* Bulk index findings and sequentially invoke auto-correlations (opensearch-project#1355) * Bulk index findings and sequentially invoke auto-correlations Signed-off-by: Megha Goyal <[email protected]> * Bulk index findings in batches of 10000 and make it configurable Signed-off-by: Megha Goyal <[email protected]> * Addressing review comments Signed-off-by: Megha Goyal <[email protected]> * Add integ tests to test bulk index findings Signed-off-by: Megha Goyal <[email protected]> * Fix ktlint formatting Signed-off-by: Megha Goyal <[email protected]> --------- Signed-off-by: Megha Goyal <[email protected]> * Add jvm aware setting and max num docs settings for batching docs for percolate queries (opensearch-project#1435) * add jvm aware and max docs settings for batching docs for percolate queries Signed-off-by: Surya Sashank Nistala <[email protected]> * fix stats logging Signed-off-by: Surya Sashank Nistala <[email protected]> * add queryfieldnames field in findings mapping Signed-off-by: Surya Sashank Nistala <[email protected]> --------- Signed-off-by: Surya Sashank Nistala <[email protected]> * optimize to fetch only fields relevant to doc level queries in doc level monitor instead of entire _source for each doc (opensearch-project#1441) * optimize to fetch only fields relevant to doc level queries in doc level monitor Signed-off-by: Surya Sashank Nistala <[email protected]> * fix test for settings check Signed-off-by: Surya Sashank Nistala <[email protected]> * fix ktlint Signed-off-by: Surya Sashank Nistala <[email protected]> --------- Signed-off-by: Surya Sashank Nistala <[email protected]> * fix integTests Signed-off-by: Joanne Wang <[email protected]> * clean up doc level queries on dry run (opensearch-project#1430) Signed-off-by: Joanne Wang <[email protected]> * optimize sequence number calculation and reduce search requests in doc level monitor execution (opensearch-project#1445) * optimize sequence number calculation and reduce search requests by n where n is number of shards being queried in the executino Signed-off-by: Surya Sashank Nistala <[email protected]> * fix tests Signed-off-by: Surya Sashank Nistala <[email protected]> * optimize check indices and execute to query only write index of aliases and datastreams during monitor creation Signed-off-by: Surya Sashank Nistala <[email protected]> * fix test Signed-off-by: Surya Sashank Nistala <[email protected]> * add javadoc Signed-off-by: Surya Sashank Nistala <[email protected]> * add tests to verify seq_no calculation Signed-off-by: Surya Sashank Nistala <[email protected]> --------- Signed-off-by: Surya Sashank Nistala <[email protected]> * fix integ tests again Signed-off-by: Joanne Wang <[email protected]> --------- Signed-off-by: Megha Goyal <[email protected]> Signed-off-by: Surya Sashank Nistala <[email protected]> Signed-off-by: Joanne Wang <[email protected]> Co-authored-by: Megha Goyal <[email protected]> Co-authored-by: Surya Sashank Nistala <[email protected]>
1 parent b25a3b2 commit dde6aab

File tree

13 files changed

+1204
-213
lines changed

13 files changed

+1204
-213
lines changed

alerting/src/main/kotlin/org/opensearch/alerting/AlertingPlugin.kt

+9-1
Original file line numberDiff line numberDiff line change
@@ -46,6 +46,7 @@ import org.opensearch.alerting.resthandler.RestSearchMonitorAction
4646
import org.opensearch.alerting.script.TriggerScript
4747
import org.opensearch.alerting.service.DeleteMonitorService
4848
import org.opensearch.alerting.settings.AlertingSettings
49+
import org.opensearch.alerting.settings.AlertingSettings.Companion.DOC_LEVEL_MONITOR_SHARD_FETCH_SIZE
4950
import org.opensearch.alerting.settings.DestinationSettings
5051
import org.opensearch.alerting.settings.LegacyOpenDistroAlertingSettings
5152
import org.opensearch.alerting.settings.LegacyOpenDistroDestinationSettings
@@ -99,6 +100,7 @@ import org.opensearch.core.xcontent.XContentParser
99100
import org.opensearch.env.Environment
100101
import org.opensearch.env.NodeEnvironment
101102
import org.opensearch.index.IndexModule
103+
import org.opensearch.monitor.jvm.JvmStats
102104
import org.opensearch.painless.spi.PainlessExtension
103105
import org.opensearch.painless.spi.Whitelist
104106
import org.opensearch.painless.spi.WhitelistLoader
@@ -268,6 +270,7 @@ internal class AlertingPlugin : PainlessExtension, ActionPlugin, ScriptPlugin, R
268270
.registerTriggerService(TriggerService(scriptService))
269271
.registerAlertService(AlertService(client, xContentRegistry, alertIndices))
270272
.registerDocLevelMonitorQueries(DocLevelMonitorQueries(client, clusterService))
273+
.registerJvmStats(JvmStats.jvmStats())
271274
.registerWorkflowService(WorkflowService(client, xContentRegistry))
272275
.registerConsumers()
273276
.registerDestinationSettings()
@@ -325,6 +328,9 @@ internal class AlertingPlugin : PainlessExtension, ActionPlugin, ScriptPlugin, R
325328
AlertingSettings.ALERT_HISTORY_MAX_DOCS,
326329
AlertingSettings.ALERT_HISTORY_RETENTION_PERIOD,
327330
AlertingSettings.ALERTING_MAX_MONITORS,
331+
AlertingSettings.PERCOLATE_QUERY_DOCS_SIZE_MEMORY_PERCENTAGE_LIMIT,
332+
DOC_LEVEL_MONITOR_SHARD_FETCH_SIZE,
333+
AlertingSettings.PERCOLATE_QUERY_MAX_NUM_DOCS_IN_MEMORY,
328334
AlertingSettings.REQUEST_TIMEOUT,
329335
AlertingSettings.MAX_ACTION_THROTTLE_VALUE,
330336
AlertingSettings.FILTER_BY_BACKEND_ROLES,
@@ -345,6 +351,7 @@ internal class AlertingPlugin : PainlessExtension, ActionPlugin, ScriptPlugin, R
345351
LegacyOpenDistroAlertingSettings.REQUEST_TIMEOUT,
346352
LegacyOpenDistroAlertingSettings.MAX_ACTION_THROTTLE_VALUE,
347353
LegacyOpenDistroAlertingSettings.FILTER_BY_BACKEND_ROLES,
354+
AlertingSettings.DOC_LEVEL_MONITOR_FETCH_ONLY_QUERY_FIELDS_ENABLED,
348355
DestinationSettings.EMAIL_USERNAME,
349356
DestinationSettings.EMAIL_PASSWORD,
350357
DestinationSettings.ALLOW_LIST,
@@ -357,7 +364,8 @@ internal class AlertingPlugin : PainlessExtension, ActionPlugin, ScriptPlugin, R
357364
AlertingSettings.FINDING_HISTORY_MAX_DOCS,
358365
AlertingSettings.FINDING_HISTORY_INDEX_MAX_AGE,
359366
AlertingSettings.FINDING_HISTORY_ROLLOVER_PERIOD,
360-
AlertingSettings.FINDING_HISTORY_RETENTION_PERIOD
367+
AlertingSettings.FINDING_HISTORY_RETENTION_PERIOD,
368+
AlertingSettings.FINDINGS_INDEXING_BATCH_SIZE
361369
)
362370
}
363371

alerting/src/main/kotlin/org/opensearch/alerting/DocumentLevelMonitorRunner.kt

+442-186
Large diffs are not rendered by default.

alerting/src/main/kotlin/org/opensearch/alerting/MonitorRunnerExecutionContext.kt

+10-1
Original file line numberDiff line numberDiff line change
@@ -18,6 +18,7 @@ import org.opensearch.cluster.service.ClusterService
1818
import org.opensearch.common.settings.Settings
1919
import org.opensearch.common.unit.TimeValue
2020
import org.opensearch.core.xcontent.NamedXContentRegistry
21+
import org.opensearch.monitor.jvm.JvmStats
2122
import org.opensearch.script.ScriptService
2223
import org.opensearch.threadpool.ThreadPool
2324

@@ -36,6 +37,7 @@ data class MonitorRunnerExecutionContext(
3637
var alertService: AlertService? = null,
3738
var docLevelMonitorQueries: DocLevelMonitorQueries? = null,
3839
var workflowService: WorkflowService? = null,
40+
var jvmStats: JvmStats? = null,
3941

4042
@Volatile var retryPolicy: BackoffPolicy? = null,
4143
@Volatile var moveAlertsRetryPolicy: BackoffPolicy? = null,
@@ -47,5 +49,12 @@ data class MonitorRunnerExecutionContext(
4749
@Volatile var destinationContextFactory: DestinationContextFactory? = null,
4850

4951
@Volatile var maxActionableAlertCount: Long = AlertingSettings.DEFAULT_MAX_ACTIONABLE_ALERT_COUNT,
50-
@Volatile var indexTimeout: TimeValue? = null
52+
@Volatile var indexTimeout: TimeValue? = null,
53+
@Volatile var findingsIndexBatchSize: Int = AlertingSettings.DEFAULT_FINDINGS_INDEXING_BATCH_SIZE,
54+
@Volatile var fetchOnlyQueryFieldNames: Boolean = true,
55+
@Volatile var percQueryMaxNumDocsInMemory: Int = AlertingSettings.DEFAULT_PERCOLATE_QUERY_NUM_DOCS_IN_MEMORY,
56+
@Volatile var percQueryDocsSizeMemoryPercentageLimit: Int =
57+
AlertingSettings.DEFAULT_PERCOLATE_QUERY_DOCS_SIZE_MEMORY_PERCENTAGE_LIMIT,
58+
@Volatile var docLevelMonitorShardFetchSize: Int =
59+
AlertingSettings.DEFAULT_DOC_LEVEL_MONITOR_SHARD_FETCH_SIZE,
5160
)

alerting/src/main/kotlin/org/opensearch/alerting/MonitorRunnerService.kt

+50-1
Original file line numberDiff line numberDiff line change
@@ -22,12 +22,18 @@ import org.opensearch.alerting.model.WorkflowRunResult
2222
import org.opensearch.alerting.model.destination.DestinationContextFactory
2323
import org.opensearch.alerting.opensearchapi.retry
2424
import org.opensearch.alerting.script.TriggerExecutionContext
25+
import org.opensearch.alerting.settings.AlertingSettings
2526
import org.opensearch.alerting.settings.AlertingSettings.Companion.ALERT_BACKOFF_COUNT
2627
import org.opensearch.alerting.settings.AlertingSettings.Companion.ALERT_BACKOFF_MILLIS
28+
import org.opensearch.alerting.settings.AlertingSettings.Companion.DOC_LEVEL_MONITOR_FETCH_ONLY_QUERY_FIELDS_ENABLED
29+
import org.opensearch.alerting.settings.AlertingSettings.Companion.DOC_LEVEL_MONITOR_SHARD_FETCH_SIZE
30+
import org.opensearch.alerting.settings.AlertingSettings.Companion.FINDINGS_INDEXING_BATCH_SIZE
2731
import org.opensearch.alerting.settings.AlertingSettings.Companion.INDEX_TIMEOUT
2832
import org.opensearch.alerting.settings.AlertingSettings.Companion.MAX_ACTIONABLE_ALERT_COUNT
2933
import org.opensearch.alerting.settings.AlertingSettings.Companion.MOVE_ALERTS_BACKOFF_COUNT
3034
import org.opensearch.alerting.settings.AlertingSettings.Companion.MOVE_ALERTS_BACKOFF_MILLIS
35+
import org.opensearch.alerting.settings.AlertingSettings.Companion.PERCOLATE_QUERY_DOCS_SIZE_MEMORY_PERCENTAGE_LIMIT
36+
import org.opensearch.alerting.settings.AlertingSettings.Companion.PERCOLATE_QUERY_MAX_NUM_DOCS_IN_MEMORY
3137
import org.opensearch.alerting.settings.DestinationSettings.Companion.ALLOW_LIST
3238
import org.opensearch.alerting.settings.DestinationSettings.Companion.HOST_DENY_LIST
3339
import org.opensearch.alerting.settings.DestinationSettings.Companion.loadDestinationSettings
@@ -48,6 +54,7 @@ import org.opensearch.commons.alerting.model.action.Action
4854
import org.opensearch.commons.alerting.util.isBucketLevelMonitor
4955
import org.opensearch.core.action.ActionListener
5056
import org.opensearch.core.xcontent.NamedXContentRegistry
57+
import org.opensearch.monitor.jvm.JvmStats
5158
import org.opensearch.script.Script
5259
import org.opensearch.script.ScriptService
5360
import org.opensearch.script.TemplateScript
@@ -132,6 +139,11 @@ object MonitorRunnerService : JobRunner, CoroutineScope, AbstractLifecycleCompon
132139
return this
133140
}
134141

142+
fun registerJvmStats(jvmStats: JvmStats): MonitorRunnerService {
143+
this.monitorCtx.jvmStats = jvmStats
144+
return this
145+
}
146+
135147
// Must be called after registerClusterService and registerSettings in AlertingPlugin
136148
fun registerConsumers(): MonitorRunnerService {
137149
monitorCtx.retryPolicy = BackoffPolicy.constantBackoff(
@@ -169,6 +181,35 @@ object MonitorRunnerService : JobRunner, CoroutineScope, AbstractLifecycleCompon
169181

170182
monitorCtx.indexTimeout = INDEX_TIMEOUT.get(monitorCtx.settings)
171183

184+
monitorCtx.findingsIndexBatchSize = FINDINGS_INDEXING_BATCH_SIZE.get(monitorCtx.settings)
185+
monitorCtx.clusterService!!.clusterSettings.addSettingsUpdateConsumer(AlertingSettings.FINDINGS_INDEXING_BATCH_SIZE) {
186+
monitorCtx.findingsIndexBatchSize = it
187+
}
188+
189+
monitorCtx.fetchOnlyQueryFieldNames = DOC_LEVEL_MONITOR_FETCH_ONLY_QUERY_FIELDS_ENABLED.get(monitorCtx.settings)
190+
monitorCtx.clusterService!!.clusterSettings.addSettingsUpdateConsumer(DOC_LEVEL_MONITOR_FETCH_ONLY_QUERY_FIELDS_ENABLED) {
191+
monitorCtx.fetchOnlyQueryFieldNames = it
192+
}
193+
194+
monitorCtx.percQueryMaxNumDocsInMemory = PERCOLATE_QUERY_MAX_NUM_DOCS_IN_MEMORY.get(monitorCtx.settings)
195+
monitorCtx.clusterService!!.clusterSettings.addSettingsUpdateConsumer(PERCOLATE_QUERY_MAX_NUM_DOCS_IN_MEMORY) {
196+
monitorCtx.percQueryMaxNumDocsInMemory = it
197+
}
198+
199+
monitorCtx.percQueryDocsSizeMemoryPercentageLimit =
200+
PERCOLATE_QUERY_DOCS_SIZE_MEMORY_PERCENTAGE_LIMIT.get(monitorCtx.settings)
201+
monitorCtx.clusterService!!.clusterSettings
202+
.addSettingsUpdateConsumer(PERCOLATE_QUERY_DOCS_SIZE_MEMORY_PERCENTAGE_LIMIT) {
203+
monitorCtx.percQueryDocsSizeMemoryPercentageLimit = it
204+
}
205+
206+
monitorCtx.docLevelMonitorShardFetchSize =
207+
DOC_LEVEL_MONITOR_SHARD_FETCH_SIZE.get(monitorCtx.settings)
208+
monitorCtx.clusterService!!.clusterSettings
209+
.addSettingsUpdateConsumer(DOC_LEVEL_MONITOR_SHARD_FETCH_SIZE) {
210+
monitorCtx.docLevelMonitorShardFetchSize = it
211+
}
212+
172213
return this
173214
}
174215

@@ -251,11 +292,19 @@ object MonitorRunnerService : JobRunner, CoroutineScope, AbstractLifecycleCompon
251292
when (job) {
252293
is Workflow -> {
253294
launch {
295+
logger.debug(
296+
"PERF_DEBUG: executing workflow ${job.id} on node " +
297+
monitorCtx.clusterService!!.state().nodes().localNode.id
298+
)
254299
runJob(job, periodStart, periodEnd, false)
255300
}
256301
}
257302
is Monitor -> {
258303
launch {
304+
logger.debug(
305+
"PERF_DEBUG: executing ${job.monitorType} ${job.id} on node " +
306+
monitorCtx.clusterService!!.state().nodes().localNode.id
307+
)
259308
runJob(job, periodStart, periodEnd, false)
260309
}
261310
}
@@ -300,7 +349,7 @@ object MonitorRunnerService : JobRunner, CoroutineScope, AbstractLifecycleCompon
300349
val runResult = if (monitor.isBucketLevelMonitor()) {
301350
BucketLevelMonitorRunner.runMonitor(monitor, monitorCtx, periodStart, periodEnd, dryrun, executionId = executionId)
302351
} else if (monitor.isDocLevelMonitor()) {
303-
DocumentLevelMonitorRunner.runMonitor(monitor, monitorCtx, periodStart, periodEnd, dryrun, executionId = executionId)
352+
DocumentLevelMonitorRunner().runMonitor(monitor, monitorCtx, periodStart, periodEnd, dryrun, executionId = executionId)
304353
} else {
305354
QueryLevelMonitorRunner.runMonitor(monitor, monitorCtx, periodStart, periodEnd, dryrun, executionId = executionId)
306355
}

alerting/src/main/kotlin/org/opensearch/alerting/model/DocumentExecutionContext.kt

-14
This file was deleted.
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,19 @@
1+
/*
2+
* Copyright OpenSearch Contributors
3+
* SPDX-License-Identifier: Apache-2.0
4+
*/
5+
6+
package org.opensearch.alerting.model
7+
8+
import org.opensearch.commons.alerting.model.DocLevelQuery
9+
10+
/** DTO that contains all the necessary context for fetching data from shard and performing percolate queries */
11+
data class IndexExecutionContext(
12+
val queries: List<DocLevelQuery>,
13+
val lastRunContext: MutableMap<String, Any>,
14+
val updatedLastRunContext: MutableMap<String, Any>,
15+
val indexName: String,
16+
val concreteIndexName: String,
17+
val conflictingFields: List<String>,
18+
val docIds: List<String>? = null,
19+
)

alerting/src/main/kotlin/org/opensearch/alerting/settings/AlertingSettings.kt

+54
Original file line numberDiff line numberDiff line change
@@ -17,6 +17,10 @@ class AlertingSettings {
1717

1818
companion object {
1919
const val DEFAULT_MAX_ACTIONABLE_ALERT_COUNT = 50L
20+
const val DEFAULT_FINDINGS_INDEXING_BATCH_SIZE = 1000
21+
const val DEFAULT_PERCOLATE_QUERY_NUM_DOCS_IN_MEMORY = 50000
22+
const val DEFAULT_PERCOLATE_QUERY_DOCS_SIZE_MEMORY_PERCENTAGE_LIMIT = 10
23+
const val DEFAULT_DOC_LEVEL_MONITOR_SHARD_FETCH_SIZE = 10000
2024

2125
val ALERTING_MAX_MONITORS = Setting.intSetting(
2226
"plugins.alerting.monitor.max_monitors",
@@ -25,6 +29,49 @@ class AlertingSettings {
2529
Setting.Property.Dynamic
2630
)
2731

32+
/** Defines the threshold percentage of heap size in bytes till which we accumulate docs in memory before we query against percolate query
33+
* index in document level monitor execution.
34+
*/
35+
val PERCOLATE_QUERY_DOCS_SIZE_MEMORY_PERCENTAGE_LIMIT = Setting.intSetting(
36+
"plugins.alerting.monitor.percolate_query_docs_size_memory_percentage_limit",
37+
10,
38+
0,
39+
100,
40+
Setting.Property.NodeScope, Setting.Property.Dynamic
41+
)
42+
43+
/** Purely a setting used to verify seq_no calculation
44+
*/
45+
val DOC_LEVEL_MONITOR_SHARD_FETCH_SIZE = Setting.intSetting(
46+
"plugins.alerting.monitor.doc_level_monitor_shard_fetch_size",
47+
DEFAULT_DOC_LEVEL_MONITOR_SHARD_FETCH_SIZE,
48+
1,
49+
10000,
50+
Setting.Property.NodeScope, Setting.Property.Dynamic
51+
)
52+
53+
/** Defines the threshold of the maximum number of docs accumulated in memory to query against percolate query index in document
54+
* level monitor execution. The docs are being collected from searching on shards of indices mentioned in the
55+
* monitor input indices field. When the number of in-memory docs reaches or exceeds threshold we immediately perform percolate
56+
* query with the current set of docs and clear the cache and repeat the process till we have queried all indices in current
57+
* execution
58+
*/
59+
val PERCOLATE_QUERY_MAX_NUM_DOCS_IN_MEMORY = Setting.intSetting(
60+
"plugins.alerting.monitor.percolate_query_max_num_docs_in_memory",
61+
DEFAULT_PERCOLATE_QUERY_NUM_DOCS_IN_MEMORY, 1000,
62+
Setting.Property.NodeScope, Setting.Property.Dynamic
63+
)
64+
65+
/**
66+
* Boolean setting to enable/disable optimizing doc level monitors by fetchign only fields mentioned in queries.
67+
* Enabled by default. If disabled, will fetch entire source of documents while fetch data from shards.
68+
*/
69+
val DOC_LEVEL_MONITOR_FETCH_ONLY_QUERY_FIELDS_ENABLED = Setting.boolSetting(
70+
"plugins.alerting.monitor.doc_level_monitor_query_field_names_enabled",
71+
true,
72+
Setting.Property.NodeScope, Setting.Property.Dynamic
73+
)
74+
2875
val INPUT_TIMEOUT = Setting.positiveTimeSetting(
2976
"plugins.alerting.input_timeout",
3077
LegacyOpenDistroAlertingSettings.INPUT_TIMEOUT,
@@ -176,5 +223,12 @@ class AlertingSettings {
176223
Setting.Property.NodeScope,
177224
Setting.Property.Dynamic
178225
)
226+
227+
val FINDINGS_INDEXING_BATCH_SIZE = Setting.intSetting(
228+
"plugins.alerting.alert_findings_indexing_batch_size",
229+
DEFAULT_FINDINGS_INDEXING_BATCH_SIZE,
230+
1,
231+
Setting.Property.NodeScope, Setting.Property.Dynamic
232+
)
179233
}
180234
}

alerting/src/main/kotlin/org/opensearch/alerting/transport/TransportIndexMonitorAction.kt

+9-1
Original file line numberDiff line numberDiff line change
@@ -198,7 +198,15 @@ class TransportIndexMonitorAction @Inject constructor(
198198
else (it as DocLevelMonitorInput).indices
199199
indices.addAll(inputIndices)
200200
}
201-
val searchRequest = SearchRequest().indices(*indices.toTypedArray())
201+
val updatedIndices = indices.map { index ->
202+
if (IndexUtils.isAlias(index, clusterService.state()) || IndexUtils.isDataStream(index, clusterService.state())) {
203+
val metadata = clusterService.state().metadata.indicesLookup[index]?.writeIndex
204+
metadata?.index?.name ?: index
205+
} else {
206+
index
207+
}
208+
}
209+
val searchRequest = SearchRequest().indices(*updatedIndices.toTypedArray())
202210
.source(SearchSourceBuilder.searchSource().size(1).query(QueryBuilders.matchAllQuery()))
203211
client.search(
204212
searchRequest,

alerting/src/main/kotlin/org/opensearch/alerting/util/DocLevelMonitorQueries.kt

+46
Original file line numberDiff line numberDiff line change
@@ -13,6 +13,8 @@ import org.opensearch.action.admin.indices.alias.Alias
1313
import org.opensearch.action.admin.indices.create.CreateIndexRequest
1414
import org.opensearch.action.admin.indices.create.CreateIndexResponse
1515
import org.opensearch.action.admin.indices.delete.DeleteIndexRequest
16+
import org.opensearch.action.admin.indices.exists.indices.IndicesExistsRequest
17+
import org.opensearch.action.admin.indices.exists.indices.IndicesExistsResponse
1618
import org.opensearch.action.admin.indices.mapping.put.PutMappingRequest
1719
import org.opensearch.action.admin.indices.rollover.RolloverRequest
1820
import org.opensearch.action.admin.indices.rollover.RolloverResponse
@@ -38,8 +40,16 @@ import org.opensearch.commons.alerting.model.DocLevelMonitorInput
3840
import org.opensearch.commons.alerting.model.DocLevelQuery
3941
import org.opensearch.commons.alerting.model.Monitor
4042
import org.opensearch.commons.alerting.model.ScheduledJob
43+
import org.opensearch.core.action.ActionListener
4144
import org.opensearch.core.rest.RestStatus
4245
import org.opensearch.index.mapper.MapperService.INDEX_MAPPING_TOTAL_FIELDS_LIMIT_SETTING
46+
import org.opensearch.index.query.QueryBuilders
47+
import org.opensearch.index.reindex.BulkByScrollResponse
48+
import org.opensearch.index.reindex.DeleteByQueryAction
49+
import org.opensearch.index.reindex.DeleteByQueryRequestBuilder
50+
import kotlin.coroutines.resume
51+
import kotlin.coroutines.resumeWithException
52+
import kotlin.coroutines.suspendCoroutine
4353

4454
private val log = LogManager.getLogger(DocLevelMonitorQueries::class.java)
4555

@@ -134,6 +144,42 @@ class DocLevelMonitorQueries(private val client: Client, private val clusterServ
134144
return true
135145
}
136146

147+
suspend fun deleteDocLevelQueriesOnDryRun(monitorMetadata: MonitorMetadata) {
148+
try {
149+
monitorMetadata.sourceToQueryIndexMapping.forEach { (_, queryIndex) ->
150+
val indicesExistsResponse: IndicesExistsResponse =
151+
client.suspendUntil {
152+
client.admin().indices().exists(IndicesExistsRequest(queryIndex), it)
153+
}
154+
if (indicesExistsResponse.isExists == false) {
155+
return
156+
}
157+
158+
val queryBuilder = QueryBuilders.boolQuery()
159+
.must(QueryBuilders.existsQuery("monitor_id"))
160+
.mustNot(QueryBuilders.wildcardQuery("monitor_id", "*"))
161+
162+
val response: BulkByScrollResponse = suspendCoroutine { cont ->
163+
DeleteByQueryRequestBuilder(client, DeleteByQueryAction.INSTANCE)
164+
.source(queryIndex)
165+
.filter(queryBuilder)
166+
.refresh(true)
167+
.execute(
168+
object : ActionListener<BulkByScrollResponse> {
169+
override fun onResponse(response: BulkByScrollResponse) = cont.resume(response)
170+
override fun onFailure(t: Exception) = cont.resumeWithException(t)
171+
}
172+
)
173+
}
174+
response.bulkFailures.forEach {
175+
log.error("Failed deleting queries while removing dry run queries: [${it.id}] cause: [${it.cause}] ")
176+
}
177+
}
178+
} catch (e: Exception) {
179+
log.error("Failed to delete doc level queries on dry run", e)
180+
}
181+
}
182+
137183
fun docLevelQueryIndexExists(dataSources: DataSources): Boolean {
138184
val clusterState = clusterService.state()
139185
return clusterState.metadata.hasAlias(dataSources.queryIndex)

alerting/src/main/kotlin/org/opensearch/alerting/workflow/CompositeWorkflowRunner.kt

+1-1
Original file line numberDiff line numberDiff line change
@@ -255,7 +255,7 @@ object CompositeWorkflowRunner : WorkflowRunner() {
255255
executionId
256256
)
257257
} else if (delegateMonitor.isDocLevelMonitor()) {
258-
return DocumentLevelMonitorRunner.runMonitor(
258+
return DocumentLevelMonitorRunner().runMonitor(
259259
delegateMonitor,
260260
monitorCtx,
261261
periodStart,

0 commit comments

Comments
 (0)