Skip to content

Commit b561965

Browse files
authored
Bulk index findings and sequentially invoke auto-correlations (opensearch-project#1355)
* Bulk index findings and sequentially invoke auto-correlations Signed-off-by: Megha Goyal <[email protected]> * Bulk index findings in batches of 10000 and make it configurable Signed-off-by: Megha Goyal <[email protected]> * Addressing review comments Signed-off-by: Megha Goyal <[email protected]> * Add integ tests to test bulk index findings Signed-off-by: Megha Goyal <[email protected]> * Fix ktlint formatting Signed-off-by: Megha Goyal <[email protected]> --------- Signed-off-by: Megha Goyal <[email protected]>
1 parent 8d59060 commit b561965

File tree

6 files changed

+159
-55
lines changed

6 files changed

+159
-55
lines changed

alerting/src/main/kotlin/org/opensearch/alerting/AlertingPlugin.kt

Lines changed: 2 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -346,7 +346,8 @@ internal class AlertingPlugin : PainlessExtension, ActionPlugin, ScriptPlugin, R
346346
AlertingSettings.FINDING_HISTORY_MAX_DOCS,
347347
AlertingSettings.FINDING_HISTORY_INDEX_MAX_AGE,
348348
AlertingSettings.FINDING_HISTORY_ROLLOVER_PERIOD,
349-
AlertingSettings.FINDING_HISTORY_RETENTION_PERIOD
349+
AlertingSettings.FINDING_HISTORY_RETENTION_PERIOD,
350+
AlertingSettings.FINDINGS_INDEXING_BATCH_SIZE
350351
)
351352
}
352353

alerting/src/main/kotlin/org/opensearch/alerting/DocumentLevelMonitorRunner.kt

Lines changed: 92 additions & 53 deletions
Original file line numberDiff line numberDiff line change
@@ -8,12 +8,15 @@ package org.opensearch.alerting
88
import org.apache.logging.log4j.LogManager
99
import org.opensearch.ExceptionsHelper
1010
import org.opensearch.OpenSearchStatusException
11+
import org.opensearch.action.DocWriteRequest
12+
import org.opensearch.action.admin.indices.refresh.RefreshAction
13+
import org.opensearch.action.admin.indices.refresh.RefreshRequest
14+
import org.opensearch.action.bulk.BulkRequest
15+
import org.opensearch.action.bulk.BulkResponse
1116
import org.opensearch.action.index.IndexRequest
12-
import org.opensearch.action.index.IndexResponse
1317
import org.opensearch.action.search.SearchAction
1418
import org.opensearch.action.search.SearchRequest
1519
import org.opensearch.action.search.SearchResponse
16-
import org.opensearch.action.support.WriteRequest
1720
import org.opensearch.alerting.model.DocumentExecutionContext
1821
import org.opensearch.alerting.model.DocumentLevelTriggerRunResult
1922
import org.opensearch.alerting.model.InputRunResults
@@ -273,10 +276,7 @@ object DocumentLevelMonitorRunner : MonitorRunner() {
273276
// If there are no triggers defined, we still want to generate findings
274277
if (monitor.triggers.isEmpty()) {
275278
if (dryrun == false && monitor.id != Monitor.NO_ID) {
276-
docsToQueries.forEach {
277-
val triggeredQueries = it.value.map { queryId -> idQueryMap[queryId]!! }
278-
createFindings(monitor, monitorCtx, triggeredQueries, it.key, true)
279-
}
279+
createFindings(monitor, monitorCtx, docsToQueries, idQueryMap, true)
280280
}
281281
} else {
282282
monitor.triggers.forEach {
@@ -365,7 +365,7 @@ object DocumentLevelMonitorRunner : MonitorRunner() {
365365
trigger: DocumentLevelTrigger,
366366
monitor: Monitor,
367367
idQueryMap: Map<String, DocLevelQuery>,
368-
docsToQueries: Map<String, List<String>>,
368+
docsToQueries: MutableMap<String, MutableList<String>>,
369369
queryToDocIds: Map<DocLevelQuery, Set<String>>,
370370
dryrun: Boolean,
371371
workflowRunContext: WorkflowRunContext?,
@@ -374,35 +374,33 @@ object DocumentLevelMonitorRunner : MonitorRunner() {
374374
val triggerCtx = DocumentLevelTriggerExecutionContext(monitor, trigger)
375375
val triggerResult = monitorCtx.triggerService!!.runDocLevelTrigger(monitor, trigger, queryToDocIds)
376376

377-
val findings = mutableListOf<String>()
378-
val findingDocPairs = mutableListOf<Pair<String, String>>()
377+
val triggerFindingDocPairs = mutableListOf<Pair<String, String>>()
379378

380379
// TODO: Implement throttling for findings
381-
docsToQueries.forEach {
382-
val triggeredQueries = it.value.map { queryId -> idQueryMap[queryId]!! }
383-
val findingId = createFindings(
384-
monitor,
385-
monitorCtx,
386-
triggeredQueries,
387-
it.key,
388-
!dryrun && monitor.id != Monitor.NO_ID,
389-
executionId
390-
)
391-
findings.add(findingId)
380+
val findingToDocPairs = createFindings(
381+
monitor,
382+
monitorCtx,
383+
docsToQueries,
384+
idQueryMap,
385+
!dryrun && monitor.id != Monitor.NO_ID,
386+
executionId
387+
)
392388

393-
if (triggerResult.triggeredDocs.contains(it.key)) {
394-
findingDocPairs.add(Pair(findingId, it.key))
389+
findingToDocPairs.forEach {
390+
// Only pick those entries whose docs have triggers associated with them
391+
if (triggerResult.triggeredDocs.contains(it.second)) {
392+
triggerFindingDocPairs.add(Pair(it.first, it.second))
395393
}
396394
}
397395

398396
val actionCtx = triggerCtx.copy(
399397
triggeredDocs = triggerResult.triggeredDocs,
400-
relatedFindings = findings,
398+
relatedFindings = findingToDocPairs.map { it.first },
401399
error = monitorResult.error ?: triggerResult.error
402400
)
403401

404402
val alerts = mutableListOf<Alert>()
405-
findingDocPairs.forEach {
403+
triggerFindingDocPairs.forEach {
406404
val alert = monitorCtx.alertService!!.composeDocLevelAlert(
407405
listOf(it.first),
408406
listOf(it.second),
@@ -461,51 +459,92 @@ object DocumentLevelMonitorRunner : MonitorRunner() {
461459
return triggerResult
462460
}
463461

462+
/**
463+
* 1. Bulk index all findings based on shouldCreateFinding flag
464+
* 2. invoke publishFinding() to kickstart auto-correlations
465+
* 3. Returns a list of pairs for finding id to doc id
466+
*/
464467
private suspend fun createFindings(
465468
monitor: Monitor,
466469
monitorCtx: MonitorRunnerExecutionContext,
467-
docLevelQueries: List<DocLevelQuery>,
468-
matchingDocId: String,
470+
docsToQueries: MutableMap<String, MutableList<String>>,
471+
idQueryMap: Map<String, DocLevelQuery>,
469472
shouldCreateFinding: Boolean,
470473
workflowExecutionId: String? = null,
471-
): String {
472-
// Before the "|" is the doc id and after the "|" is the index
473-
val docIndex = matchingDocId.split("|")
474+
): List<Pair<String, String>> {
474475

475-
val finding = Finding(
476-
id = UUID.randomUUID().toString(),
477-
relatedDocIds = listOf(docIndex[0]),
478-
correlatedDocIds = listOf(docIndex[0]),
479-
monitorId = monitor.id,
480-
monitorName = monitor.name,
481-
index = docIndex[1],
482-
docLevelQueries = docLevelQueries,
483-
timestamp = Instant.now(),
484-
executionId = workflowExecutionId
485-
)
476+
val findingDocPairs = mutableListOf<Pair<String, String>>()
477+
val findings = mutableListOf<Finding>()
478+
val indexRequests = mutableListOf<IndexRequest>()
486479

487-
val findingStr = finding.toXContent(XContentBuilder.builder(XContentType.JSON.xContent()), ToXContent.EMPTY_PARAMS).string()
488-
logger.debug("Findings: $findingStr")
480+
docsToQueries.forEach {
481+
val triggeredQueries = it.value.map { queryId -> idQueryMap[queryId]!! }
489482

490-
if (shouldCreateFinding) {
491-
val indexRequest = IndexRequest(monitor.dataSources.findingsIndex)
492-
.setRefreshPolicy(WriteRequest.RefreshPolicy.IMMEDIATE)
493-
.source(findingStr, XContentType.JSON)
494-
.id(finding.id)
495-
.routing(finding.id)
483+
// Before the "|" is the doc id and after the "|" is the index
484+
val docIndex = it.key.split("|")
496485

497-
monitorCtx.client!!.suspendUntil<Client, IndexResponse> {
498-
monitorCtx.client!!.index(indexRequest, it)
486+
val finding = Finding(
487+
id = UUID.randomUUID().toString(),
488+
relatedDocIds = listOf(docIndex[0]),
489+
correlatedDocIds = listOf(docIndex[0]),
490+
monitorId = monitor.id,
491+
monitorName = monitor.name,
492+
index = docIndex[1],
493+
docLevelQueries = triggeredQueries,
494+
timestamp = Instant.now(),
495+
executionId = workflowExecutionId
496+
)
497+
findingDocPairs.add(Pair(finding.id, it.key))
498+
findings.add(finding)
499+
500+
val findingStr =
501+
finding.toXContent(XContentBuilder.builder(XContentType.JSON.xContent()), ToXContent.EMPTY_PARAMS)
502+
.string()
503+
logger.debug("Findings: $findingStr")
504+
505+
if (shouldCreateFinding) {
506+
indexRequests += IndexRequest(monitor.dataSources.findingsIndex)
507+
.source(findingStr, XContentType.JSON)
508+
.id(finding.id)
509+
.opType(DocWriteRequest.OpType.CREATE)
499510
}
500511
}
501512

513+
if (indexRequests.isNotEmpty()) {
514+
bulkIndexFindings(monitor, monitorCtx, indexRequests)
515+
}
516+
502517
try {
503-
publishFinding(monitor, monitorCtx, finding)
518+
findings.forEach { finding ->
519+
publishFinding(monitor, monitorCtx, finding)
520+
}
504521
} catch (e: Exception) {
505522
// suppress exception
506523
logger.error("Optional finding callback failed", e)
507524
}
508-
return finding.id
525+
return findingDocPairs
526+
}
527+
528+
private suspend fun bulkIndexFindings(
529+
monitor: Monitor,
530+
monitorCtx: MonitorRunnerExecutionContext,
531+
indexRequests: List<IndexRequest>
532+
) {
533+
indexRequests.chunked(monitorCtx.findingsIndexBatchSize).forEach { batch ->
534+
val bulkResponse: BulkResponse = monitorCtx.client!!.suspendUntil {
535+
bulk(BulkRequest().add(batch), it)
536+
}
537+
if (bulkResponse.hasFailures()) {
538+
bulkResponse.items.forEach { item ->
539+
if (item.isFailed) {
540+
logger.error("Failed indexing the finding ${item.id} of monitor [${monitor.id}]")
541+
}
542+
}
543+
} else {
544+
logger.debug("[${bulkResponse.items.size}] All findings successfully indexed.")
545+
}
546+
}
547+
monitorCtx.client!!.execute(RefreshAction.INSTANCE, RefreshRequest(monitor.dataSources.findingsIndex))
509548
}
510549

511550
private fun publishFinding(
@@ -629,7 +668,7 @@ object DocumentLevelMonitorRunner : MonitorRunner() {
629668
matchingDocs.addAll(getAllDocs(hits, index, concreteIndex, monitor.id, conflictingFields))
630669
}
631670
} catch (e: Exception) {
632-
logger.warn("Failed to run for shard $shard. Error: ${e.message}")
671+
logger.error("Failed to run for shard $shard. Error: ${e.message}")
633672
}
634673
}
635674
return matchingDocs

alerting/src/main/kotlin/org/opensearch/alerting/MonitorRunnerExecutionContext.kt

Lines changed: 2 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -47,5 +47,6 @@ data class MonitorRunnerExecutionContext(
4747
@Volatile var destinationContextFactory: DestinationContextFactory? = null,
4848

4949
@Volatile var maxActionableAlertCount: Long = AlertingSettings.DEFAULT_MAX_ACTIONABLE_ALERT_COUNT,
50-
@Volatile var indexTimeout: TimeValue? = null
50+
@Volatile var indexTimeout: TimeValue? = null,
51+
@Volatile var findingsIndexBatchSize: Int = AlertingSettings.DEFAULT_FINDINGS_INDEXING_BATCH_SIZE
5152
)

alerting/src/main/kotlin/org/opensearch/alerting/MonitorRunnerService.kt

Lines changed: 7 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -22,8 +22,10 @@ import org.opensearch.alerting.model.WorkflowRunResult
2222
import org.opensearch.alerting.model.destination.DestinationContextFactory
2323
import org.opensearch.alerting.opensearchapi.retry
2424
import org.opensearch.alerting.script.TriggerExecutionContext
25+
import org.opensearch.alerting.settings.AlertingSettings
2526
import org.opensearch.alerting.settings.AlertingSettings.Companion.ALERT_BACKOFF_COUNT
2627
import org.opensearch.alerting.settings.AlertingSettings.Companion.ALERT_BACKOFF_MILLIS
28+
import org.opensearch.alerting.settings.AlertingSettings.Companion.FINDINGS_INDEXING_BATCH_SIZE
2729
import org.opensearch.alerting.settings.AlertingSettings.Companion.INDEX_TIMEOUT
2830
import org.opensearch.alerting.settings.AlertingSettings.Companion.MAX_ACTIONABLE_ALERT_COUNT
2931
import org.opensearch.alerting.settings.AlertingSettings.Companion.MOVE_ALERTS_BACKOFF_COUNT
@@ -169,6 +171,11 @@ object MonitorRunnerService : JobRunner, CoroutineScope, AbstractLifecycleCompon
169171

170172
monitorCtx.indexTimeout = INDEX_TIMEOUT.get(monitorCtx.settings)
171173

174+
monitorCtx.findingsIndexBatchSize = FINDINGS_INDEXING_BATCH_SIZE.get(monitorCtx.settings)
175+
monitorCtx.clusterService!!.clusterSettings.addSettingsUpdateConsumer(AlertingSettings.FINDINGS_INDEXING_BATCH_SIZE) {
176+
monitorCtx.findingsIndexBatchSize = it
177+
}
178+
172179
return this
173180
}
174181

alerting/src/main/kotlin/org/opensearch/alerting/settings/AlertingSettings.kt

Lines changed: 8 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -17,6 +17,7 @@ class AlertingSettings {
1717

1818
companion object {
1919
const val DEFAULT_MAX_ACTIONABLE_ALERT_COUNT = 50L
20+
const val DEFAULT_FINDINGS_INDEXING_BATCH_SIZE = 1000
2021

2122
val ALERTING_MAX_MONITORS = Setting.intSetting(
2223
"plugins.alerting.monitor.max_monitors",
@@ -153,5 +154,12 @@ class AlertingSettings {
153154
-1L,
154155
Setting.Property.NodeScope, Setting.Property.Dynamic
155156
)
157+
158+
val FINDINGS_INDEXING_BATCH_SIZE = Setting.intSetting(
159+
"plugins.alerting.alert_findings_indexing_batch_size",
160+
DEFAULT_FINDINGS_INDEXING_BATCH_SIZE,
161+
1,
162+
Setting.Property.NodeScope, Setting.Property.Dynamic
163+
)
156164
}
157165
}

alerting/src/test/kotlin/org/opensearch/alerting/DocumentMonitorRunnerIT.kt

Lines changed: 48 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -393,6 +393,54 @@ class DocumentMonitorRunnerIT : AlertingRestTestCase() {
393393
assertEquals("Didn't find findings for docs 1 and 5", 2, foundFindings.size)
394394
}
395395

396+
fun `test execute monitor for bulk index findings`() {
397+
val testIndexPrefix = "test-index-${randomAlphaOfLength(10).lowercase(Locale.ROOT)}"
398+
val testQueryName = "wildcard-test-query"
399+
val testIndex = createTestIndex("${testIndexPrefix}1")
400+
val testIndex2 = createTestIndex("${testIndexPrefix}2")
401+
402+
val testTime = DateTimeFormatter.ISO_OFFSET_DATE_TIME.format(ZonedDateTime.now().truncatedTo(MILLIS))
403+
val testDoc = """{
404+
"message" : "This is an error from IAD region",
405+
"test_strict_date_time" : "$testTime",
406+
"test_field" : "us-west-2"
407+
}"""
408+
409+
val docQuery = DocLevelQuery(query = "test_field:\"us-west-2\"", name = testQueryName, fields = listOf())
410+
val docLevelInput = DocLevelMonitorInput("description", listOf("$testIndexPrefix*"), listOf(docQuery))
411+
412+
val trigger = randomDocumentLevelTrigger(condition = Script("query[name=$testQueryName]"))
413+
val monitor = createMonitor(randomDocumentLevelMonitor(inputs = listOf(docLevelInput), triggers = listOf(trigger)))
414+
assertNotNull(monitor.id)
415+
416+
for (i in 0 until 9) {
417+
indexDoc(testIndex, i.toString(), testDoc)
418+
}
419+
indexDoc(testIndex2, "3", testDoc)
420+
adminClient().updateSettings("plugins.alerting.alert_findings_indexing_batch_size", 2)
421+
422+
val response = executeMonitor(monitor.id)
423+
424+
val output = entityAsMap(response)
425+
426+
assertEquals(monitor.name, output["monitor_name"])
427+
@Suppress("UNCHECKED_CAST")
428+
val searchResult = (output.objectMap("input_results")["results"] as List<Map<String, Any>>).first()
429+
@Suppress("UNCHECKED_CAST")
430+
val matchingDocsToQuery = searchResult[docQuery.id] as List<String>
431+
assertEquals("Correct search result", 10, matchingDocsToQuery.size)
432+
assertTrue("Correct search result", matchingDocsToQuery.containsAll(listOf("1|$testIndex", "2|$testIndex", "3|$testIndex2")))
433+
434+
val alerts = searchAlertsWithFilter(monitor)
435+
assertEquals("Alert saved for test monitor", 10, alerts.size)
436+
437+
val findings = searchFindings(monitor)
438+
assertEquals("Findings saved for test monitor", 10, findings.size)
439+
val foundFindings =
440+
findings.filter { it.relatedDocIds.contains("1") || it.relatedDocIds.contains("2") || it.relatedDocIds.contains("3") }
441+
assertEquals("Found findings for all docs", 4, foundFindings.size)
442+
}
443+
396444
fun `test execute monitor with wildcard index that generates alerts and findings for NOT EQUALS query operator`() {
397445
val testIndexPrefix = "test-index-${randomAlphaOfLength(10).lowercase(Locale.ROOT)}"
398446
val testQueryName = "wildcard-test-query"

0 commit comments

Comments
 (0)